You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by nm...@apache.org on 2006/05/15 15:39:33 UTC
svn commit: r406628 [9/10] - in /incubator/activemq/trunk/openwire-cpp: ./
src/gram/java/org/apache/activemq/openwire/tool/ src/gram/script/
src/main/cpp/activemq/ src/main/cpp/activemq/command/
src/main/cpp/activemq/protocol/ src/main/cpp/activemq/pro...
Modified: incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/transport/tcp/TcpTransport.hpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/transport/tcp/TcpTransport.hpp?rev=406628&r1=406627&r2=406628&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/transport/tcp/TcpTransport.hpp (original)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/transport/tcp/TcpTransport.hpp Mon May 15 06:38:57 2006
@@ -26,7 +26,6 @@
#include <map>
#include "cms/CmsException.hpp"
#include "activemq/BrokerException.hpp"
-#include "activemq/ICommand.hpp"
#include "activemq/command/BaseCommand.hpp"
#include "activemq/command/Response.hpp"
#include "activemq/command/ExceptionResponse.hpp"
@@ -35,6 +34,10 @@
#include "activemq/transport/ITransport.hpp"
#include "activemq/transport/ICommandListener.hpp"
#include "ppr/InvalidOperationException.hpp"
+#include "ppr/io/DataInputStream.hpp"
+#include "ppr/io/DataOutputStream.hpp"
+#include "ppr/io/BufferedInputStream.hpp"
+#include "ppr/io/BufferedOutputStream.hpp"
#include "ppr/io/SocketInputStream.hpp"
#include "ppr/io/SocketOutputStream.hpp"
#include "ppr/net/ISocket.hpp"
@@ -73,14 +76,14 @@
class TcpTransport : public ITransport
{
private:
- p<IProtocol> protocol ;
- p<SocketInputStream> reader ;
- p<SocketOutputStream> writer ;
- p<ICommandListener> listener ;
- p<ReadThread> readThread ;
- p<ISocket> socket ;
- bool closed,
- started ;
+ p<IProtocol> protocol ;
+ p<DataInputStream> istream ;
+ p<DataOutputStream> ostream ;
+ p<ICommandListener> listener ;
+ p<ReadThread> readThread ;
+ p<ISocket> socket ;
+ bool closed,
+ started ;
public:
TcpTransport(p<ISocket> socket, p<IProtocol> wireProtocol) ;
@@ -90,9 +93,9 @@
virtual p<ICommandListener> getCommandListener() ;
virtual void start() ;
- virtual void oneway(p<ICommand> command) ;
- virtual p<FutureResponse> asyncRequest(p<ICommand> command) ;
- virtual p<Response> request(p<ICommand> command) ;
+ virtual void oneway(p<BaseCommand> command) ;
+ virtual p<FutureResponse> asyncRequest(p<BaseCommand> command) ;
+ virtual p<Response> request(p<BaseCommand> command) ;
public:
void readLoop() ;
Modified: incubator/activemq/trunk/openwire-cpp/src/main/cpp/cms/IBytesMessage.hpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/cms/IBytesMessage.hpp?rev=406628&r1=406627&r2=406628&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/cms/IBytesMessage.hpp (original)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/cms/IBytesMessage.hpp Mon May 15 06:38:57 2006
@@ -51,7 +51,7 @@
virtual int readInt() throw (MessageNotReadableException, MessageEOFException) = 0 ;
virtual long long readLong() throw (MessageNotReadableException, MessageEOFException) = 0 ;
virtual short readShort() throw (MessageNotReadableException, MessageEOFException) = 0 ;
- virtual p<string> readUTF() throw (MessageNotReadableException, MessageEOFException) = 0 ;
+ virtual p<string> readString() throw (MessageNotReadableException, MessageEOFException) = 0 ;
virtual void writeBoolean(bool value) throw (MessageNotWritableException) = 0 ;
virtual void writeByte(char value) throw (MessageNotWritableException) = 0 ;
virtual void writeBytes(char* value, int index, int length) throw (MessageNotWritableException) = 0 ;
@@ -60,7 +60,7 @@
virtual void writeInt(int value) throw (MessageNotWritableException) = 0 ;
virtual void writeLong(long long value) throw (MessageNotWritableException) = 0 ;
virtual void writeShort(short value) throw (MessageNotWritableException) = 0 ;
- virtual void writeUTF(const char* value) throw (MessageNotWritableException) = 0 ;
+ virtual void writeString(const char* value) throw (MessageNotWritableException) = 0 ;
} ;
/* namespace */
Modified: incubator/activemq/trunk/openwire-cpp/src/main/cpp/cms/IMapMessage.hpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/cms/IMapMessage.hpp?rev=406628&r1=406627&r2=406628&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/cms/IMapMessage.hpp (original)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/cms/IMapMessage.hpp Mon May 15 06:38:57 2006
@@ -23,6 +23,8 @@
#endif
#include "cms/IMessage.hpp"
+#include "cms/MessageFormatException.hpp"
+#include "ppr/IllegalArgumentException.hpp"
#include "ppr/util/MapItemHolder.hpp"
#include "ppr/util/ifr/p"
@@ -30,6 +32,7 @@
{
namespace cms
{
+ using namespace apache::ppr;
using namespace apache::ppr::util;
using namespace ifr;
@@ -39,6 +42,24 @@
struct IMapMessage : IMessage
{
virtual p<PropertyMap> getBody() = 0 ;
+ virtual bool getBoolean(const char* name) throw (MessageFormatException, IllegalArgumentException) = 0 ;
+ virtual void setBoolean(const char* name, bool value) throw (IllegalArgumentException) = 0 ;
+ virtual char getByte(const char* name) throw (MessageFormatException, IllegalArgumentException) = 0 ;
+ virtual void setByte(const char* name, char value) throw (IllegalArgumentException) = 0 ;
+ virtual array<char> getBytes(const char* name) throw (MessageFormatException, IllegalArgumentException) = 0 ;
+ virtual void setBytes(const char* name, array<char> value) throw (IllegalArgumentException) = 0 ;
+ virtual double getDouble(const char* name) throw (MessageFormatException, IllegalArgumentException) = 0 ;
+ virtual void setDouble(const char* name, double value) throw (IllegalArgumentException) = 0 ;
+ virtual float getFloat(const char* name) throw (MessageFormatException, IllegalArgumentException) = 0 ;
+ virtual void setFloat(const char* name, float value) throw (IllegalArgumentException) = 0 ;
+ virtual int getInt(const char* name) throw (MessageFormatException, IllegalArgumentException) = 0 ;
+ virtual void setInt(const char* name, int value) throw (IllegalArgumentException) = 0 ;
+ virtual long long getLong(const char* name) throw (MessageFormatException, IllegalArgumentException) = 0 ;
+ virtual void setLong(const char* name, long long value) throw (IllegalArgumentException) = 0 ;
+ virtual short getShort(const char* name) throw (MessageFormatException, IllegalArgumentException) = 0 ;
+ virtual void setShort(const char* name, short value) throw (IllegalArgumentException) = 0 ;
+ virtual p<string> getString(const char* name) throw (MessageFormatException, IllegalArgumentException) = 0 ;
+ virtual void setString(const char* name, const char* value) throw (IllegalArgumentException) = 0 ;
} ;
/* namespace */
Added: incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/BufferedInputStream.cpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/BufferedInputStream.cpp?rev=406628&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/BufferedInputStream.cpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/BufferedInputStream.cpp Mon May 15 06:38:57 2006
@@ -0,0 +1,132 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "ppr/io/BufferedInputStream.hpp"
+
+using namespace apache::ppr::io;
+
+/*
+ *
+ */
+BufferedInputStream::BufferedInputStream(p<IInputStream> istream)
+{
+ this->istream = istream ;
+ this->size = DEFAULT_SIZE ;
+ buffer = new char[size] ;
+ position = 0 ;
+ treshold = 0 ;
+}
+
+/*
+ *
+ */
+BufferedInputStream::BufferedInputStream(p<IInputStream> istream, int size)
+{
+ this->istream = istream ;
+ this->size = size ;
+ buffer = new char[size] ;
+ position = 0 ;
+ treshold = 0 ;
+}
+
+/*
+ * Close stream.
+ */
+void BufferedInputStream::close() throw(IOException)
+{
+ // Cascade close request to underlying stream
+ if( istream != NULL )
+ {
+ istream->close() ;
+ istream = NULL ;
+ buffer = NULL ;
+ }
+}
+
+/*
+ *
+ */
+int BufferedInputStream::read(char* buf, int offset, int length) throw(IOException)
+{
+ // Check if underlying stream has been closed
+ checkClosed() ;
+
+ // Assert parameters
+ if( buf == NULL || offset < 0 || offset > length || length < 0 )
+ throw IllegalArgumentException() ;
+
+ // Skip read if length is invalid
+ if( length == 0 )
+ return 0 ;
+
+ // Have we reached end-of-buffer?
+ if( isEOB() )
+ {
+ // Skip buffering should request be larger than internal buffer
+ if( length >= size )
+ return istream->read(buf, offset, length) ;
+
+ // Load internal buffer with new data
+ loadBuffer() ;
+ }
+ // Any data available?
+ if( isEOB() )
+ return -1 ;
+
+ // Copy requested bytes up to max buffer size
+ int bytesRead = min(length, treshold - position) ;
+
+ // Copy read bytes into supplied buffer
+ memcpy(&buf[offset], &buffer[position], bytesRead) ;
+
+ // Adjust array position
+ position += bytesRead ;
+
+ return bytesRead ;
+}
+
+/*
+ * Load the input buffer with new data.
+ */
+void BufferedInputStream::loadBuffer() throw(IOException)
+{
+ // Try to load the whole buffer
+ int bytesRead = istream->read(buffer, 0, size) ;
+
+ // Reset counters if any data was loaded
+ if( bytesRead > 0 )
+ {
+ treshold = bytesRead ;
+ position = 0 ;
+ }
+}
+
+/*
+ * Check if stream has been closed.
+ */
+void BufferedInputStream::checkClosed() throw(IOException)
+{
+ if( istream == NULL )
+ throw IOException("Input stream closed") ;
+}
+
+/*
+ * Check is end-of-buffer has been reached.
+ */
+bool BufferedInputStream::isEOB()
+{
+ return ( position >= treshold ) ? true : false ;
+}
Added: incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/BufferedInputStream.hpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/BufferedInputStream.hpp?rev=406628&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/BufferedInputStream.hpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/BufferedInputStream.hpp Mon May 15 06:38:57 2006
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef Ppr_BufferedInputStream_hpp_
+#define Ppr_BufferedInputStream_hpp_
+
+// Turn off warning message for ignored exception specification
+#ifdef _MSC_VER
+#pragma warning( disable : 4290 )
+#endif
+
+#include <stdlib.h>
+#include "ppr/IllegalArgumentException.hpp"
+#include "ppr/io/IInputStream.hpp"
+#include "ppr/io/IOException.hpp"
+#include "ppr/thread/SimpleMutex.hpp"
+#include "ppr/util/ifr/p"
+
+namespace apache
+{
+ namespace ppr
+ {
+ namespace io
+ {
+ using namespace ifr ;
+ using namespace apache::ppr ;
+ using namespace apache::ppr::thread ;
+
+/*
+ * Buffers bytes to provide more efficient reading from an
+ * input stream.
+ */
+class BufferedInputStream : public IInputStream
+{
+private:
+ p<IInputStream> istream ;
+ char* buffer ;
+ int size, position, treshold ;
+
+ // Default buffer size
+ static const int DEFAULT_SIZE = 10240 ;
+
+public:
+ BufferedInputStream(p<IInputStream> istream) ;
+ BufferedInputStream(p<IInputStream> istream, int size) ;
+
+ virtual void close() throw(IOException) ;
+ virtual int read(char* buf, int offset, int length) throw(IOException) ;
+
+private:
+ void checkClosed() throw(IOException) ;
+ void loadBuffer() throw(IOException) ;
+ bool isEOB() ;
+} ;
+
+/* namespace */
+ }
+ }
+}
+
+#endif /*Ppr_BufferedInputStream_hpp_*/
Added: incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/BufferedOutputStream.cpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/BufferedOutputStream.cpp?rev=406628&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/BufferedOutputStream.cpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/BufferedOutputStream.cpp Mon May 15 06:38:57 2006
@@ -0,0 +1,142 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "ppr/io/BufferedOutputStream.hpp"
+
+using namespace apache::ppr::io;
+
+/*
+ *
+ */
+BufferedOutputStream::BufferedOutputStream(p<IOutputStream> ostream)
+{
+ this->ostream = ostream ;
+ this->size = DEFAULT_SIZE ;
+ buffer = new char[size] ;
+ position = 0 ;
+ treshold = size ;
+}
+
+/*
+ *
+ */
+BufferedOutputStream::BufferedOutputStream(p<IOutputStream> ostream, int size)
+{
+ this->ostream = ostream ;
+ this->size = size ;
+ buffer = new char[size] ;
+ position = 0 ;
+ treshold = size ;
+}
+
+/*
+ * Close stream.
+ */
+void BufferedOutputStream::close() throw(IOException)
+{
+ // Cascade close request to internal stream
+ if( ostream != NULL )
+ {
+ // Flush any remaining bytes
+ flush0() ;
+
+ // Shut down
+ ostream->close() ;
+ ostream = NULL ;
+ buffer = NULL ;
+ }
+}
+
+/*
+ *
+ */
+int BufferedOutputStream::write(const char* buf, int offset, int length) throw(IOException)
+{
+ // Check if underlying stream has been closed
+ checkClosed() ;
+
+ // Assert parameters
+ if( buf == NULL || offset < 0 || offset > length || length < 0 )
+ throw IllegalArgumentException() ;
+
+ // Skip write if length is invalid
+ if( length == 0 )
+ return 0 ;
+
+ // Skip buffering should request be larger than internal buffer
+ if( length >= size )
+ {
+ flush0() ;
+ return ostream->write(buf, offset, length) ;
+ }
+ int start = offset, end = offset + length ;
+
+ while( start < end )
+ {
+ int delta = min(treshold - position, end - start) ;
+ memcpy(&buffer[position], &buf[start], delta) ;
+ start += delta ;
+ position += delta ;
+
+ // Have we reached end-of-buffer?
+ if( isEOB() )
+ flush0() ;
+ }
+ return length ;
+}
+
+/*
+ * Flush stream, i.e. buffer.
+ */
+void BufferedOutputStream::flush() throw(IOException)
+{
+ // Check if underlying stream has been closed
+ checkClosed() ;
+
+ flush0() ;
+}
+
+/*
+ * Flush buffer.
+ */
+void BufferedOutputStream::flush0() throw(IOException)
+{
+ // Check if there is anything to flush
+ if( position > 0 )
+ {
+ ostream->write(buffer, 0, position) ;
+ position = 0 ;
+ }
+ // Flush underlying stream
+ ostream->flush() ;
+}
+
+/*
+ * Check if stream has been closed.
+ */
+void BufferedOutputStream::checkClosed() throw(IOException)
+{
+ if( ostream == NULL )
+ throw IOException("Output stream closed") ;
+}
+
+/*
+ * Check is end-of-buffer has been reached.
+ */
+bool BufferedOutputStream::isEOB()
+{
+ return ( position >= treshold ) ? true : false ;
+}
Added: incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/BufferedOutputStream.hpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/BufferedOutputStream.hpp?rev=406628&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/BufferedOutputStream.hpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/BufferedOutputStream.hpp Mon May 15 06:38:57 2006
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef Ppr_BufferedOutputStream_hpp_
+#define Ppr_BufferedOutputStream_hpp_
+
+// Turn off warning message for ignored exception specification
+#ifdef _MSC_VER
+#pragma warning( disable : 4290 )
+#endif
+
+#include <stdlib.h>
+#include "ppr/IllegalArgumentException.hpp"
+#include "ppr/io/IOutputStream.hpp"
+#include "ppr/io/IOException.hpp"
+#include "ppr/thread/SimpleMutex.hpp"
+#include "ppr/util/ifr/p"
+
+namespace apache
+{
+ namespace ppr
+ {
+ namespace io
+ {
+ using namespace ifr ;
+ using namespace apache::ppr ;
+ using namespace apache::ppr::thread ;
+
+/*
+ * Buffers bytes to provide more efficient writing to an
+ * output stream.
+ */
+class BufferedOutputStream : public IOutputStream
+{
+private:
+ p<IOutputStream> ostream ;
+ char* buffer ;
+ int size, position, treshold ;
+
+ // Default buffer size
+ static const int DEFAULT_SIZE = 10240 ;
+
+public:
+ BufferedOutputStream(p<IOutputStream> ostream) ;
+ BufferedOutputStream(p<IOutputStream> ostream, int size) ;
+
+ virtual void close() throw(IOException) ;
+ virtual void flush() throw(IOException) ;
+ virtual int write(const char* buf, int offset, int length) throw(IOException) ;
+
+private:
+ void checkClosed() throw(IOException) ;
+ void flush0() throw(IOException) ;
+ bool isEOB() ;
+} ;
+
+/* namespace */
+ }
+ }
+}
+
+#endif /*Ppr_BufferedOutputStream_hpp_*/
Modified: incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/ByteArrayInputStream.hpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/ByteArrayInputStream.hpp?rev=406628&r1=406627&r2=406628&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/ByteArrayInputStream.hpp (original)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/ByteArrayInputStream.hpp Mon May 15 06:38:57 2006
@@ -17,7 +17,7 @@
#ifndef Ppr_ByteArrayInputStream_hpp_
#define Ppr_ByteArrayInputStream_hpp_
-#include "ppr/io/DataInputStream.hpp"
+#include "ppr/io/IInputStream.hpp"
#include "ppr/io/EOFException.hpp"
#include "ppr/util/ifr/array"
#include "ppr/util/ifr/p"
@@ -34,7 +34,7 @@
* ByteArrayInputStream reads primitive C++ data types from an
* in-memory byte array.
*/
-class ByteArrayInputStream : public DataInputStream
+class ByteArrayInputStream : public IInputStream
{
private:
array<char> body ;
Modified: incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/ByteArrayOutputStream.hpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/ByteArrayOutputStream.hpp?rev=406628&r1=406627&r2=406628&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/ByteArrayOutputStream.hpp (original)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/ByteArrayOutputStream.hpp Mon May 15 06:38:57 2006
@@ -17,7 +17,7 @@
#ifndef Ppr_ByteArrayOutputStream_hpp_
#define Ppr_ByteArrayOutputStream_hpp_
-#include "ppr/io/DataOutputStream.hpp"
+#include "ppr/io/IOutputStream.hpp"
#include "ppr/util/ifr/array"
#include "ppr/util/ifr/p"
@@ -33,7 +33,7 @@
* ByteArrayOutputStream writes primitive C++ data types to a
* in memory byte array.
*/
-class ByteArrayOutputStream : public DataOutputStream
+class ByteArrayOutputStream : public IOutputStream
{
private:
array<char> body ;
Modified: incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/DataInputStream.cpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/DataInputStream.cpp?rev=406628&r1=406627&r2=406628&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/DataInputStream.cpp (original)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/DataInputStream.cpp Mon May 15 06:38:57 2006
@@ -21,9 +21,19 @@
/*
*
*/
-DataInputStream::DataInputStream()
+DataInputStream::DataInputStream(p<IInputStream> istream)
{
- // no-op
+ this->istream = istream ;
+ this->encoder = CharsetEncoderRegistry::getEncoder() ;
+}
+
+/*
+ *
+ */
+DataInputStream::DataInputStream(p<IInputStream> istream, const char* encname)
+{
+ this->istream = istream ;
+ this->encoder = CharsetEncoderRegistry::getEncoder(encname) ;
}
/*
@@ -37,12 +47,40 @@
/*
*
*/
+void DataInputStream::close() throw(IOException)
+{
+ // Cascade close request to underlying stream
+ if( istream != NULL )
+ {
+ istream->close() ;
+ istream = NULL ;
+ }
+}
+
+/*
+ *
+ */
+int DataInputStream::read(char* buffer, int offset, int length) throw(IOException)
+{
+ // Check if underlying stream has been closed
+ checkClosed() ;
+
+ // Read buffer
+ return istream->read(buffer, offset, length) ;
+}
+
+/*
+ *
+ */
char DataInputStream::readByte() throw(IOException)
{
char value ;
+ // Check if underlying stream has been closed
+ checkClosed() ;
+
// Read a single byte
- read(&value, 0, sizeof(char)) ;
+ istream->read(&value, 0, sizeof(char)) ;
return value ;
}
@@ -62,8 +100,11 @@
{
double value ;
+ // Check if underlying stream has been closed
+ checkClosed() ;
+
// Read a double and convert from big endian to little endian if necessary
- read((char*)&value, 0, sizeof(double) ) ;
+ istream->read((char*)&value, 0, sizeof(double) ) ;
return ntohd(value) ;
}
@@ -74,8 +115,11 @@
{
float value ;
+ // Check if underlying stream has been closed
+ checkClosed() ;
+
// Read a float and convert from big endian to little endian if necessary
- read((char*)&value, 0, sizeof(float)) ;
+ istream->read((char*)&value, 0, sizeof(float)) ;
return ntohf(value) ;
}
@@ -86,8 +130,11 @@
{
short value ;
+ // Check if underlying stream has been closed
+ checkClosed() ;
+
// Read a short and convert from big endian to little endian if necessary
- read((char*)&value, 0, sizeof(short)) ;
+ istream->read((char*)&value, 0, sizeof(short)) ;
return ntohs(value) ;
}
@@ -98,8 +145,11 @@
{
int value ;
+ // Check if underlying stream has been closed
+ checkClosed() ;
+
// Read an int and convert from big endian to little endian if necessary
- read((char*)&value, 0, sizeof(int)) ;
+ istream->read((char*)&value, 0, sizeof(int)) ;
return ntohi(value) ;
}
@@ -110,8 +160,11 @@
{
long long value ;
+ // Check if underlying stream has been closed
+ checkClosed() ;
+
// Read a long long and convert from big endian to little endian if necessary
- read((char*)&value, 0, sizeof(long long)) ;
+ istream->read((char*)&value, 0, sizeof(long long)) ;
return ntohll(value) ;
}
@@ -121,26 +174,42 @@
p<string> DataInputStream::readString() throw(IOException)
{
p<string> value ;
- char* buffer ;
short length ;
+ // Check if underlying stream has been closed
+ checkClosed() ;
+
// Read length of string
length = readShort() ;
- if (length < 0) {
- throw IOException ("Negative length of string");
- } else if (length > 0) {
- buffer = new char[length+1] ;
+ if (length < 0)
+ throw IOException ("Negative length of string") ;
+ else if (length > 0)
+ {
+ array<char> buffer = array<char> (length+1) ;
// Read string bytes
- read(buffer, 0, length) ;
- *(buffer+length) = '\0' ;
+ istream->read(&buffer[0], 0, length) ;
+ buffer[length] = '\0' ;
- // Create string class
+ // Create string
value = new string() ;
- value->assign(buffer) ;
+ value->assign( buffer.c_array() ) ;
- return value ;
- } else {
- return new string ("");
+ // Decode string if charset encoder has been configured
+ if( encoder != NULL )
+ value = encoder->decode(value) ;
}
+ else // ...empty string
+ value = new string("") ;
+
+ return value ;
+}
+
+/*
+ * Check if stream has been closed.
+ */
+void DataInputStream::checkClosed() throw(IOException)
+{
+ if( istream == NULL )
+ throw IOException("Input stream closed") ;
}
Modified: incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/DataInputStream.hpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/DataInputStream.hpp?rev=406628&r1=406627&r2=406628&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/DataInputStream.hpp (original)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/DataInputStream.hpp Mon May 15 06:38:57 2006
@@ -18,6 +18,8 @@
#define Ppr_DataInputStream_hpp_
#include "ppr/io/IInputStream.hpp"
+#include "ppr/io/encoding/ICharsetEncoder.hpp"
+#include "ppr/io/encoding/CharsetEncoderRegistry.hpp"
#include "ppr/util/Endian.hpp"
#include "ppr/util/ifr/p"
@@ -27,13 +29,14 @@
{
namespace io
{
- using namespace apache::ppr::util; //htonx and ntohx functions.
using namespace ifr;
+ using namespace apache::ppr::util; //htonx and ntohx functions.
+ using namespace apache::ppr::io::encoding;
/*
* The DataInputStream class reads primitive C++ data types from an
* underlying input stream in a Java compatible way. Strings are
- * read as raw bytes, no character decoding is performed.
+ * read as raw bytes or decoded should encoding have been configured.
*
* All numeric data types are assumed to be available in big
* endian (network byte order) and are converted automatically
@@ -43,12 +46,17 @@
*/
class DataInputStream : public IInputStream
{
+private:
+ p<IInputStream> istream ;
+ p<ICharsetEncoder> encoder ;
+
public:
- DataInputStream() ;
+ DataInputStream(p<IInputStream> istream) ;
+ DataInputStream(p<IInputStream> istream, const char* encname) ;
virtual ~DataInputStream() ;
- virtual void close() throw(IOException) = 0 ;
- virtual int read(char* buffer, int index, int count) throw(IOException) = 0 ;
+ virtual void close() throw(IOException) ;
+ virtual int read(char* buffer, int offset, int length) throw(IOException) ;
virtual char readByte() throw(IOException) ;
virtual bool readBoolean() throw(IOException) ;
virtual double readDouble() throw(IOException) ;
@@ -57,6 +65,9 @@
virtual int readInt() throw(IOException) ;
virtual long long readLong() throw(IOException) ;
virtual p<string> readString() throw(IOException) ;
+
+protected:
+ void checkClosed() throw(IOException) ;
} ;
/* namespace */
Modified: incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/DataOutputStream.cpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/DataOutputStream.cpp?rev=406628&r1=406627&r2=406628&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/DataOutputStream.cpp (original)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/DataOutputStream.cpp Mon May 15 06:38:57 2006
@@ -18,65 +18,194 @@
using namespace apache::ppr::io;
+/*
+ *
+ */
+DataOutputStream::DataOutputStream(p<IOutputStream> ostream)
+{
+ this->ostream = ostream ;
+ this->encoder = CharsetEncoderRegistry::getEncoder() ;
+}
+
+/*
+ *
+ */
+DataOutputStream::DataOutputStream(p<IOutputStream> ostream, const char* encname)
+{
+ this->ostream = ostream ;
+ this->encoder = CharsetEncoderRegistry::getEncoder(encname) ;
+}
+
+/*
+ *
+ */
+DataOutputStream::~DataOutputStream()
+{
+ // no-op
+}
+
+/*
+ *
+ */
+void DataOutputStream::close() throw(IOException)
+{
+ // Cascade close request to underlying stream
+ if( ostream != NULL )
+ {
+ ostream->close() ;
+ ostream = NULL ;
+ }
+}
+
+/*
+ *
+ */
+void DataOutputStream::flush() throw(IOException)
+{
+ // Check if underlying stream has been closed
+ checkClosed() ;
+
+ // Flush stream
+ ostream->flush() ;
+}
+
+/*
+ *
+ */
+int DataOutputStream::write(const char* buffer, int offset, int length) throw(IOException)
+{
+ // Check if underlying stream has been closed
+ checkClosed() ;
+
+ // Write buffer
+ return ostream->write(buffer, offset, length) ;
+}
+
+/*
+ *
+ */
void DataOutputStream::writeByte(char value) throw(IOException)
{
+ // Check if underlying stream has been closed
+ checkClosed() ;
+
// Write a single byte
- write((char*)&value, 0, sizeof(char)) ;
+ ostream->write((char*)&value, 0, sizeof(char)) ;
}
+/*
+ *
+ */
void DataOutputStream::writeBoolean(bool value) throw(IOException)
{
// Write a boolean
( value ) ? writeByte(0x01) : writeByte(0x00) ;
}
+/*
+ *
+ */
void DataOutputStream::writeDouble(double v) throw(IOException)
{
+ // Check if underlying stream has been closed
+ checkClosed() ;
+
// Write a double, convert from little endian to big endian if necessary
double value = htond(v) ;
- write((char*)&value, 0, sizeof(double)) ;
+ ostream->write((char*)&value, 0, sizeof(double)) ;
}
+/*
+ *
+ */
void DataOutputStream::writeFloat(float v) throw(IOException)
{
+ // Check if underlying stream has been closed
+ checkClosed() ;
+
// Write a float, convert from little endian to big endian if necessary
float value = htonf(v) ;
- write((char*)&value, 0, sizeof(float)) ;
+ ostream->write((char*)&value, 0, sizeof(float)) ;
}
+/*
+ *
+ */
void DataOutputStream::writeShort(short v) throw(IOException)
{
+ // Check if underlying stream has been closed
+ checkClosed() ;
+
// Write a short, convert from little endian to big endian if necessary
short value = htons(v) ;
- write((char*)&value, 0, sizeof(short)) ;
+ ostream->write((char*)&value, 0, sizeof(short)) ;
}
+/*
+ *
+ */
void DataOutputStream::writeInt(int v) throw(IOException)
{
+ // Check if underlying stream has been closed
+ checkClosed() ;
+
// Write an int, convert from little endian to big endian if necessary
int value = htoni(v) ;
- write((char*)&value, 0, sizeof(int)) ;
+ ostream->write((char*)&value, 0, sizeof(int)) ;
}
+/*
+ *
+ */
void DataOutputStream::writeLong(long long v) throw(IOException)
{
+ // Check if underlying stream has been closed
+ checkClosed() ;
+
// Write a long long, convert from little endian to big endian if necessary
long long value = htonll(v) ;
- write((char*)&value, 0, sizeof(long long)) ;
+ ostream->write((char*)&value, 0, sizeof(long long)) ;
}
-void DataOutputStream::writeString(p<string> value) throw(IOException)
+/*
+ *
+ */
+int DataOutputStream::writeString(p<string> value) throw(IOException)
{
+ // Check if underlying stream has been closed
+ checkClosed() ;
+
// Assert argument
if( value == NULL )
- return ;
- if( value->length() > USHRT_MAX )
+ return 0 ;
+
+ p<string> data ;
+ int length = (int)value->length() ;
+
+ // Encode string if an charset encoder has been configured
+ if( encoder != NULL )
+ data = encoder->encode(value, &length) ;
+ else
+ data = value ;
+
+ // Assert string length
+ if( length > USHRT_MAX )
throw IOException("String length exceeds maximum length") ;
// Write length of string
- short length = (short)value->length() ;
- writeShort( length ) ;
+ writeShort( (short)length ) ;
// Write string contents
- write((char*)value->c_str(), 0, length) ;
+ ostream->write((char*)data->c_str(), 0, length) ;
+
+ return length ;
+}
+
+/*
+ * Check if stream has been closed.
+ */
+void DataOutputStream::checkClosed() throw(IOException)
+{
+ if( ostream == NULL )
+ throw IOException("Output stream closed") ;
}
Modified: incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/DataOutputStream.hpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/DataOutputStream.hpp?rev=406628&r1=406627&r2=406628&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/DataOutputStream.hpp (original)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/DataOutputStream.hpp Mon May 15 06:38:57 2006
@@ -18,6 +18,8 @@
#define Ppr_DataOutputStream_hpp_
#include "ppr/io/IOutputStream.hpp"
+#include "ppr/io/encoding/ICharsetEncoder.hpp"
+#include "ppr/io/encoding/CharsetEncoderRegistry.hpp"
#include "ppr/util/Endian.hpp"
#include "ppr/util/ifr/p"
@@ -27,13 +29,15 @@
{
namespace io
{
- using namespace apache::ppr::util; // htonx and ntohx functions.
using namespace ifr;
+ using namespace apache::ppr::util; // htonx and ntohx functions.
+ using namespace apache::ppr::io::encoding;
/*
* The DataOutputStream class writes primitive C++ data types to an
* underlying output stream in a Java compatible way. Strings
- * are written as raw bytes, no character encoding is performed.
+ * are written as either raw bytes or encoded should encoding
+ * have been configured.
*
* All numeric data types are written in big endian (network byte
* order) and if the platform is little endian they are converted
@@ -43,10 +47,18 @@
*/
class DataOutputStream : public IOutputStream
{
+private:
+ p<IOutputStream> ostream ;
+ p<ICharsetEncoder> encoder ;
+
public:
- virtual void close() throw(IOException) = 0 ;
- virtual void flush() throw(IOException) = 0 ;
- virtual int write(const char* buffer, int index, int count) throw(IOException) = 0 ;
+ DataOutputStream(p<IOutputStream> ostream) ;
+ DataOutputStream(p<IOutputStream> ostream, const char* encname) ;
+ virtual ~DataOutputStream() ;
+
+ virtual void close() throw(IOException) ;
+ virtual void flush() throw(IOException) ;
+ virtual int write(const char* buffer, int offset, int length) throw(IOException) ;
virtual void writeByte(char v) throw(IOException) ;
virtual void writeBoolean(bool v) throw(IOException) ;
virtual void writeDouble(double v) throw(IOException) ;
@@ -54,7 +66,10 @@
virtual void writeShort(short v) throw(IOException) ;
virtual void writeInt(int v) throw(IOException) ;
virtual void writeLong(long long v) throw(IOException) ;
- virtual void writeString(p<string> v) throw(IOException) ;
+ virtual int writeString(p<string> v) throw(IOException) ;
+
+protected:
+ void checkClosed() throw(IOException) ;
} ;
/* namespace */
Modified: incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/IInputStream.hpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/IInputStream.hpp?rev=406628&r1=406627&r2=406628&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/IInputStream.hpp (original)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/IInputStream.hpp Mon May 15 06:38:57 2006
@@ -45,14 +45,6 @@
{
virtual void close() throw(IOException) = 0 ;
virtual int read(char* buffer, int index, int count) throw(IOException) = 0 ;
- virtual char readByte() throw(IOException) = 0 ;
- virtual bool readBoolean() throw(IOException) = 0 ;
- virtual double readDouble() throw(IOException) = 0 ;
- virtual float readFloat() throw(IOException) = 0 ;
- virtual short readShort() throw(IOException) = 0 ;
- virtual int readInt() throw(IOException) = 0 ;
- virtual long long readLong() throw(IOException) = 0 ;
- virtual p<string> readString() throw(IOException) = 0 ;
} ;
/* namespace */
Modified: incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/IOutputStream.hpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/IOutputStream.hpp?rev=406628&r1=406627&r2=406628&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/IOutputStream.hpp (original)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/IOutputStream.hpp Mon May 15 06:38:57 2006
@@ -46,15 +46,7 @@
{
virtual void close() throw(IOException) = 0 ;
virtual void flush() throw(IOException) = 0 ;
- virtual int write(const char* buffer, int index, int count) throw(IOException) = 0 ;
- virtual void writeByte(char v) throw(IOException) = 0 ;
- virtual void writeBoolean(bool v) throw(IOException) = 0 ;
- virtual void writeDouble(double v) throw(IOException) = 0 ;
- virtual void writeFloat(float v) throw(IOException) = 0 ;
- virtual void writeShort(short v) throw(IOException) = 0 ;
- virtual void writeInt(int v) throw(IOException) = 0 ;
- virtual void writeLong(long long v) throw(IOException) = 0 ;
- virtual void writeString(p<string> v) throw(IOException) = 0 ;
+ virtual int write(const char* buffer, int offset, int length) throw(IOException) = 0 ;
} ;
/* namespace */
Modified: incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/SocketInputStream.cpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/SocketInputStream.cpp?rev=406628&r1=406627&r2=406628&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/SocketInputStream.cpp (original)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/SocketInputStream.cpp Mon May 15 06:38:57 2006
@@ -18,7 +18,18 @@
using namespace apache::ppr::io;
-void SocketInputStream::close() throw(IOException)
+/*
+ *
+ */
+SocketInputStream::SocketInputStream(p<ISocket> socket)
+{
+ this->socket = socket ;
+}
+
+/*
+ *
+ */
+SocketInputStream::~SocketInputStream()
{
// no-op
}
@@ -26,21 +37,37 @@
/*
*
*/
-int SocketInputStream::read(char* buf, int index, int size) throw(IOException)
+void SocketInputStream::close() throw(IOException)
{
- char* buffer = buf + index ;
- int length, remaining = size ;
+ // Cascade close request to underlying socket
+ if( socket != NULL )
+ {
+ socket->close() ;
+ socket = NULL ;
+ }
+}
+
+/*
+ *
+ */
+int SocketInputStream::read(char* buf, int offset, int length) throw(IOException)
+{
+ char* buffer = buf + offset ;
+ //char* buffer = buf + index ;
+ //int length, remaining = size ;
+ int bytesRead ;
// Loop until requested number of bytes are read
- while( remaining > 0 )
- {
+ //while( remaining > 0 )
+ //{
// Try to read remaining bytes
- length = remaining ;
+ // length = remaining ;
try
{
// Read some bytes from socket
- length = socket->receive(buffer, length) ;
+ //length = socket->receive(buffer, length) ;
+ bytesRead = socket->receive(buffer, length) ;
}
catch( SocketException se )
{
@@ -49,8 +76,9 @@
}
// Adjust buffer pointer and remaining number of bytes
- buffer += length ;
- remaining -= length ;
- }
- return size ;
+ //buffer += length ;
+ //remaining -= length ;
+ //}
+ //return size ;
+ return bytesRead ;
}
Modified: incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/SocketInputStream.hpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/SocketInputStream.hpp?rev=406628&r1=406627&r2=406628&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/SocketInputStream.hpp (original)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/SocketInputStream.hpp Mon May 15 06:38:57 2006
@@ -17,7 +17,7 @@
#ifndef Ppr_SocketInputStream_hpp_
#define Ppr_SocketInputStream_hpp_
-#include "ppr/io/DataInputStream.hpp"
+#include "ppr/io/IInputStream.hpp"
#include "ppr/net/ISocket.hpp"
#include "ppr/net/SocketException.hpp"
#include "ppr/util/ifr/p"
@@ -30,22 +30,23 @@
{
using namespace ifr ;
using namespace apache::ppr::net ;
- using namespace apache::ppr::util ;
/*
* SocketInputStream reads primitive C++ data types from a
* socket stream. It currently uses PPR sockets for
* platform independency.
*/
-class SocketInputStream : public DataInputStream
+class SocketInputStream : public IInputStream
{
private:
p<ISocket> socket ;
+
public:
- SocketInputStream(p<ISocket> socket) : socket (socket) {}
-public:
+ SocketInputStream(p<ISocket> socket) ;
+ virtual ~SocketInputStream() ;
+
virtual void close() throw(IOException) ;
- virtual int read(char* buffer, int index, int size) throw(IOException) ;
+ virtual int read(char* buffer, int offset, int length) throw(IOException) ;
} ;
/* namespace */
Modified: incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/SocketOutputStream.cpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/SocketOutputStream.cpp?rev=406628&r1=406627&r2=406628&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/SocketOutputStream.cpp (original)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/SocketOutputStream.cpp Mon May 15 06:38:57 2006
@@ -34,11 +34,22 @@
// no-op
}
+/*
+ *
+ */
void SocketOutputStream::close() throw(IOException)
{
- // no-op
+ // Cascade close request to underlying socket
+ if( socket != NULL )
+ {
+ socket->close() ;
+ socket = NULL ;
+ }
}
+/*
+ *
+ */
void SocketOutputStream::flush() throw(IOException)
{
// no-op
@@ -47,21 +58,24 @@
/*
*
*/
-int SocketOutputStream::write(const char* buf, int index, int size) throw(IOException)
+int SocketOutputStream::write(const char* buf, int offset, int length) throw(IOException)
{
- const char* buffer = buf + index ;
- int length, remaining = size ;
+ const char* buffer = buf + offset ;
+ //const char* buffer = buf + index ;
+ //int length, remaining = size ;
+ int bytesWritten ;
// Loop until requested number of bytes are read
- while( remaining > 0 )
- {
+ //while( remaining > 0 )
+ //{
// Try to write remaining bytes
- length = remaining ;
+ //length = remaining ;
try
{
// Write some bytes to socket
- length = socket->send(buffer, length) ;
+ //length = socket->send(buffer, length) ;
+ bytesWritten = socket->send(buffer, length) ;
}
catch( SocketException se )
{
@@ -70,8 +84,9 @@
}
// Adjust buffer pointer and remaining number of bytes
- buffer += length ;
- remaining -= length ;
- }
- return size ;
+ //buffer += length ;
+ //remaining -= length ;
+ //}
+ //return size ;
+ return bytesWritten ;
}
Modified: incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/SocketOutputStream.hpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/SocketOutputStream.hpp?rev=406628&r1=406627&r2=406628&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/SocketOutputStream.hpp (original)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/SocketOutputStream.hpp Mon May 15 06:38:57 2006
@@ -17,7 +17,7 @@
#ifndef Ppr_SocketOutputStream_hpp_
#define Ppr_SocketOutputStream_hpp_
-#include "ppr/io/DataOutputStream.hpp"
+#include "ppr/io/IOutputStream.hpp"
#include "ppr/net/ISocket.hpp"
#include "ppr/net/SocketException.hpp"
#include "ppr/util/ifr/p"
@@ -30,14 +30,13 @@
{
using namespace ifr ;
using namespace apache::ppr::net ;
- using namespace apache::ppr::util ;
/*
* SocketOutputStream writes primitive C++ data types to a
* socket stream. It currently uses PPR sockets for
* platform independency.
*/
-class SocketOutputStream : public DataOutputStream
+class SocketOutputStream : public IOutputStream
{
private:
p<ISocket> socket ;
@@ -48,7 +47,7 @@
virtual void close() throw(IOException) ;
virtual void flush() throw(IOException) ;
- virtual int write(const char* buffer, int index, int size) throw(IOException) ;
+ virtual int write(const char* buffer, int offset, int length) throw(IOException) ;
} ;
/* namespace */
Added: incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/encoding/AsciiToUTF8Encoder.cpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/encoding/AsciiToUTF8Encoder.cpp?rev=406628&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/encoding/AsciiToUTF8Encoder.cpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/encoding/AsciiToUTF8Encoder.cpp Mon May 15 06:38:57 2006
@@ -0,0 +1,187 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "ppr/io/encoding/AsciiToUTF8Encoder.hpp"
+
+using namespace apache::ppr::io::encoding;
+
+
+// Init static members
+const char* AsciiToUTF8Encoder::NAME = "AsciiToUTF8" ;
+
+/*
+ *
+ */
+AsciiToUTF8Encoder::AsciiToUTF8Encoder()
+{
+ // no-op
+}
+
+/*
+ *
+ */
+AsciiToUTF8Encoder::~AsciiToUTF8Encoder()
+{
+ // no-op
+}
+
+/*
+ * Counts length of encoded version of given string.
+ */
+int AsciiToUTF8Encoder::length(p<string> str)
+{
+ // Assert parameter
+ if( str == NULL )
+ {
+ // Nothing to encode
+ return 0 ;
+ }
+
+ int ch,
+ utflen = 0,
+ strlen = (int)str->length() ;
+
+ for( int i = 0 ; i < strlen ; i++ )
+ {
+ ch = ((*str)[i] & 0xFF) ;
+
+ // Single byte in range 0x0001 - 0x007F
+ if( (ch >= 0x0001) && (ch <= 0x007F) )
+ utflen++ ;
+
+ // Triple bytes above 0x07FF (should never occur, ASCII 0x00 - 0xFF)
+ else if( ch > 0x07FF )
+ utflen += 3 ;
+
+ // Double bytes for 0x0000 and in range 0x0080 - 0x07FF
+ else
+ utflen += 2 ;
+ }
+ return utflen ;
+}
+
+/*
+ * Encodes given string from ASCII into modified UTF-8.
+ */
+p<string> AsciiToUTF8Encoder::encode(p<string> str, int *enclen)
+{
+ // Assert parameter
+ if( str == NULL )
+ {
+ // Nothing to encode
+ *enclen = 0 ;
+ return NULL ;
+ }
+
+ p<string> encstr = new string("") ;
+ int ch, strlen = (int)str->length() ;
+
+ // Init encoded length
+ *enclen = 0 ;
+
+ // Loop through string and encode each char
+ for( int i = 0 ; i < strlen ; i++ )
+ {
+ ch = ((*str)[i] & 0xFF) ;
+
+ // Single byte in range 0x0001 - 0x007F
+ if( (ch >= 0x0001) && (ch <= 0x007F) )
+ {
+ encstr->append(1, (char)ch) ;
+ (*enclen)++ ;
+ }
+ // Triple bytes above 0x07FF (should never occur, ASCII 0x00 - 0xFF)
+ else if( ch > 0x07FF )
+ {
+ encstr->append(1, (char)( ((ch >> 12) & 0x0F) | 0xE0 )) ;
+ encstr->append(1, (char)( ((ch >> 6) & 0x3F) | 0x80 )) ;
+ encstr->append(1, (char)( ((ch >> 0) & 0x3F) | 0x80 )) ;
+ *enclen += 3 ;
+ }
+ // Double bytes for 0x0000 and in range 0x0080 - 0x07FF
+ else
+ {
+ encstr->append(1, (char)( ((ch >> 6) & 0x1F) | 0xC0 )) ;
+ encstr->append(1, (char)( ((ch >> 0) & 0x3F) | 0x80 )) ;
+ *enclen += 2 ;
+ }
+ }
+ return encstr ;
+}
+
+/*
+ * Decodes given string from modified UTF-8 into ASCII.
+ */
+p<string> AsciiToUTF8Encoder::decode(p<string> str)
+{
+ // Assert argument
+ if( str == NULL || str->length() == 0 )
+ return NULL ;
+
+ p<string> decstr = new string("") ;
+ int length = (int)str->length() ;
+ unsigned char ch, ch2, ch3;
+ int i = 0 ;
+
+ // Loop through and decode each char
+ while( i < length )
+ {
+ ch = ((*str)[i] & 0xFF) ;
+
+ switch( ch >> 4 )
+ {
+ case 0:
+ case 1:
+ case 2:
+ case 3:
+ case 4:
+ case 5:
+ case 6:
+ case 7: // Single byte char, 0xxxxxxx
+ i++ ;
+ decstr->append( 1, (char)ch ) ;
+ break ;
+ case 12:
+ case 13: // Double bytes char, 110xxxxx 10xxxxxx
+ i += 2 ;
+
+ if( i > length )
+ throw exception() ;
+
+ ch2 = (*str)[i - 1] ;
+ if( (ch2 & 0xC0) != 0x80 )
+ throw exception() ;
+
+ decstr->append( 1, (char)(((ch & 0x1F) << 6) | (ch2 & 0x3F)) ) ;
+ break ;
+ case 14: // Triple bytes char, 1110xxxx 10xxxxxx 10xxxxxx
+ i += 3 ;
+ if( i > length )
+ throw exception() ;
+
+ ch2 = (*str)[i - 2] ;
+ ch3 = (*str)[i - 1] ;
+ if( ((ch2 & 0xC0) != 0x80) || ((ch3 & 0xC0) != 0x80) )
+ throw exception();
+
+ decstr->append( 1, (char)(((ch & 0x0F) << 12) | ((ch2 & 0x3F) << 6) | ((ch3 & 0x3F) << 0)) ) ;
+ break ;
+ default: // Unsupported, 10xxxxxx 1111xxxx
+ throw exception() ;
+ }
+ }
+ return decstr ;
+}
Added: incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/encoding/AsciiToUTF8Encoder.hpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/encoding/AsciiToUTF8Encoder.hpp?rev=406628&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/encoding/AsciiToUTF8Encoder.hpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/encoding/AsciiToUTF8Encoder.hpp Mon May 15 06:38:57 2006
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef Ppr_AsciiToUTF8Encoder_hpp_
+#define Ppr_AsciiToUTF8Encoder_hpp_
+
+#include <string>
+#include <ppr/io/ByteArrayOutputStream.hpp>
+#include <ppr/io/encoding/ICharsetEncoder.hpp>
+#include <ppr/util/ifr/array>
+#include <ppr/util/ifr/p>
+
+namespace apache
+{
+ namespace ppr
+ {
+ namespace io
+ {
+ namespace encoding
+ {
+ using namespace std;
+ using namespace ifr;
+ using namespace ppr::io ;
+
+/*
+ * Character encoder for extended ASCII to modified UTF-8 encoding.
+ */
+class AsciiToUTF8Encoder : public ICharsetEncoder
+{
+private:
+
+public:
+ static const char* NAME ;
+
+public:
+ AsciiToUTF8Encoder() ;
+ virtual ~AsciiToUTF8Encoder() ;
+
+ virtual int length(p<string> str) ;
+ virtual p<string> encode(p<string> str, int *enclen) ;
+ virtual p<string> decode(p<string> str) ;
+} ;
+
+/* namespace */
+ }
+ }
+ }
+}
+
+#endif /*Ppr_AsciiToUTF8Encoder_hpp_*/
Added: incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/encoding/CharsetEncoderRegistry.cpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/encoding/CharsetEncoderRegistry.cpp?rev=406628&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/encoding/CharsetEncoderRegistry.cpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/encoding/CharsetEncoderRegistry.cpp Mon May 15 06:38:57 2006
@@ -0,0 +1,121 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "ppr/io/encoding/CharsetEncoderRegistry.hpp"
+#include <cctype>
+#include <algorithm>
+
+using namespace apache::ppr::io::encoding;
+
+// Init static members
+map<string, p<ICharsetEncoder> > CharsetEncoderRegistry::encoders ;
+const char* CharsetEncoderRegistry::DEFAULT = AsciiToUTF8Encoder::NAME ;
+
+// Init the default set of encoders
+static CharsetEncoderRegistry::MapInitializer initializer ;
+
+
+// --- Constructors -------------------------------------------------
+
+/*
+ *
+ */
+CharsetEncoderRegistry::CharsetEncoderRegistry()
+{
+ // no-op
+}
+
+/*
+ *
+ */
+CharsetEncoderRegistry::~CharsetEncoderRegistry()
+{
+ // no-op
+}
+
+
+// --- Attribute methods --------------------------------------------
+
+/*
+ *
+ */
+p<ICharsetEncoder> CharsetEncoderRegistry::getEncoder()
+{
+ return getEncoder( CharsetEncoderRegistry::DEFAULT ) ;
+}
+
+/*
+ *
+ */
+p<ICharsetEncoder> CharsetEncoderRegistry::getEncoder(const char* name)
+{
+ // Assert argument
+ if( name == NULL )
+ return NULL ;
+
+ map<string, p<ICharsetEncoder> >::iterator tempIter ;
+ string key = string(name) ;
+
+ // Make key string all lower case
+ std::transform(key.begin(), key.end(), key.begin(), (int(*)(int))tolower) ; // The explicit cast is needed to compile on Linux
+
+ // Check if key exists in map
+ tempIter = encoders.find( key ) ;
+ if( tempIter != encoders.end() )
+ return tempIter->second ;
+ else // Not found
+ return NULL ;
+}
+
+
+// --- Operation methods --------------------------------------------
+
+/*
+ *
+ */
+void CharsetEncoderRegistry::addEncoder(const char* name, p<ICharsetEncoder> encoder) throw(IllegalArgumentException)
+{
+ // Assert arguments
+ if( name == NULL || encoder == NULL )
+ throw IllegalArgumentException("Name and/or encoder cannot be NULL") ;
+
+ // Make key string all lower case
+ string key = string(name) ;
+ std::transform(key.begin(), key.end(), key.begin(), (int(*)(int))tolower) ; // The explicit cast is needed to compile on Linux
+
+ encoders[key] = encoder ;
+}
+
+/*
+ *
+ */
+void CharsetEncoderRegistry::removeEncoder(const char* name) throw(IllegalArgumentException)
+{
+ // Assert argument
+ if( name == NULL )
+ throw IllegalArgumentException("Name cannot be NULL") ;
+
+ map<string, p<ICharsetEncoder> >::iterator tempIter ;
+ string key = string(name) ;
+
+ // Make key string all lower case
+ std::transform(key.begin(), key.end(), key.begin(), (int(*)(int))tolower) ; // The explicit cast is needed to compile on Linux
+
+ // Check if key exists in map
+ tempIter = encoders.find( key ) ;
+ if( tempIter != encoders.end() )
+ encoders.erase( tempIter ) ;
+}
Added: incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/encoding/CharsetEncoderRegistry.hpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/encoding/CharsetEncoderRegistry.hpp?rev=406628&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/encoding/CharsetEncoderRegistry.hpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/encoding/CharsetEncoderRegistry.hpp Mon May 15 06:38:57 2006
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef Ppr_CharsetEncoderRegistry_hpp_
+#define Ppr_CharsetEncoderRegistry_hpp_
+
+// Turn off warning message for ignored exception specification
+#ifdef _MSC_VER
+#pragma warning( disable : 4290 )
+#endif
+
+#include <string>
+#include <map>
+#include "ppr/io/encoding/ICharsetEncoder.hpp"
+#include "ppr/IllegalArgumentException.hpp"
+#include "ppr/io/encoding/AsciiToUTF8Encoder.hpp"
+#include "ppr/util/ifr/p"
+
+namespace apache
+{
+ namespace ppr
+ {
+ namespace io
+ {
+ namespace encoding
+ {
+ using namespace ifr ;
+ using namespace std ;
+ using namespace apache::ppr ;
+
+/*
+ * The character set encoder registry maintains all available
+ * encoders/decoders.
+ */
+class CharsetEncoderRegistry
+{
+private:
+ static map<string, p<ICharsetEncoder> > encoders ;
+
+public:
+ // Name of the default encoder
+ static const char* DEFAULT ;
+
+protected:
+ CharsetEncoderRegistry() ;
+
+public:
+ virtual ~CharsetEncoderRegistry() ;
+
+ static p<ICharsetEncoder> getEncoder() ;
+ static p<ICharsetEncoder> getEncoder(const char* name) ;
+
+ static void addEncoder(const char* name, p<ICharsetEncoder> encoder) throw(IllegalArgumentException) ;
+ static void removeEncoder(const char* name) throw(IllegalArgumentException) ;
+
+ class MapInitializer
+ {
+ public:
+ MapInitializer()
+ {
+ // Add the default set of encoders
+ CharsetEncoderRegistry::addEncoder(AsciiToUTF8Encoder::NAME, new AsciiToUTF8Encoder() ) ;
+ }
+ } ;
+
+ friend class CharsetEncoderRegistry::MapInitializer ;
+} ;
+
+/* namespace */
+ }
+ }
+ }
+}
+
+#endif /*Ppr_CharsetEncoderRegistry_hpp_*/
Added: incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/encoding/ICharsetEncoder.hpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/encoding/ICharsetEncoder.hpp?rev=406628&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/encoding/ICharsetEncoder.hpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/encoding/ICharsetEncoder.hpp Mon May 15 06:38:57 2006
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef Ppr_ICharsetEncoder_hpp_
+#define Ppr_ICharsetEncoder_hpp_
+
+#include <string>
+#include "ppr/util/ifr/array"
+#include "ppr/util/ifr/p"
+
+namespace apache
+{
+ namespace ppr
+ {
+ namespace io
+ {
+ namespace encoding
+ {
+ using namespace ifr ;
+ using namespace std ;
+
+/*
+ * The ICharsetEncoder interface should be implemented by any class
+ * intended to be a character set encoder/decoder.
+ */
+struct ICharsetEncoder : Interface
+{
+ virtual int length(p<string> str) = 0 ;
+ virtual p<string> encode(p<string> str, int *enclen) = 0 ;
+ virtual p<string> decode(p<string> str) = 0 ;
+} ;
+
+/* namespace */
+ }
+ }
+ }
+}
+
+#endif /*Ppr_ICharsetEncoder_hpp_*/
Modified: incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/util/Hex.cpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/util/Hex.cpp?rev=406628&r1=406627&r2=406628&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/util/Hex.cpp (original)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/util/Hex.cpp Mon May 15 06:38:57 2006
@@ -47,6 +47,5 @@
sprintf(&result[i*2], "%02x", (unsigned char) buffer[i]) ;
hexStr = new string(result.c_array(), result.size() - 1) ;
-
return hexStr ;
}
Added: incubator/activemq/trunk/openwire-cpp/src/test/cpp/IUnitTest.hpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/test/cpp/IUnitTest.hpp?rev=406628&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/test/cpp/IUnitTest.hpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/test/cpp/IUnitTest.hpp Mon May 15 06:38:57 2006
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef IUnitTest_hpp_
+#define IUnitTest_hpp_
+
+// Turn off warning message for ignored exception specification
+#ifdef _MSC_VER
+#pragma warning( disable : 4290 )
+#endif
+
+#include <exception>
+#include "ppr/util/ifr/p"
+
+using namespace ifr;
+using namespace std;
+
+struct IUnitTest : Interface
+{
+ virtual void setUp() throw (exception) = 0 ;
+ virtual void execute() throw (exception) = 0 ;
+ virtual void tearDown() throw (exception) = 0 ;
+ virtual p<string> toString() = 0 ;
+};
+
+#endif /*IUnitTest_hpp_*/
Added: incubator/activemq/trunk/openwire-cpp/src/test/cpp/TestAsynchQueue.cpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/test/cpp/TestAsynchQueue.cpp?rev=406628&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/test/cpp/TestAsynchQueue.cpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/test/cpp/TestAsynchQueue.cpp Mon May 15 06:38:57 2006
@@ -0,0 +1,121 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "TestAsynchQueue.hpp"
+
+/*
+ *
+ */
+TestAsynchQueue::TestAsynchQueue(p<IConnection> connection)
+{
+ this->connection = connection ;
+ this->error = NULL ;
+}
+
+/*
+ *
+ */
+TestAsynchQueue::~TestAsynchQueue()
+{
+ // no-op
+}
+
+/*
+ *
+ */
+void TestAsynchQueue::setUp() throw (exception)
+{
+ // Create a session
+ session = connection->createSession() ;
+}
+
+/*
+ *
+ */
+void TestAsynchQueue::execute() throw (exception)
+{
+ p<IQueue> queue ;
+ p<IMessageConsumer> consumer ;
+ p<IMessageProducer> producer ;
+ p<IBytesMessage> message ;
+
+ // Connect to a queue
+ queue = session->getQueue("FOO.BAR") ;
+
+ // Create producer and a asycnhrounous consumer
+ producer = session->createProducer(queue) ;
+ consumer = session->createConsumer(queue) ;
+ consumer->setMessageListener( smartify(this) ) ;
+
+ // Create binary message
+ message = session->createBytesMessage() ;
+ message->writeBoolean(true) ;
+ message->writeInt(3677490) ;
+ message->writeString("Hello Binary World!") ;
+
+ // Send message
+ producer->send(message) ;
+
+ // Wait for asynchronous message for 5s
+ semaphore.wait(5) ;
+
+ // Check if any error was registered by the message handler
+ if( error != NULL )
+ throw TraceException( error ) ;
+}
+
+/*
+ *
+ */
+void TestAsynchQueue::tearDown() throw (exception)
+{
+ // Clean up
+ session->close() ;
+ session = NULL ;
+}
+
+p<string> TestAsynchQueue::toString()
+{
+ p<string> str = new string("Send/receive a byte message to 1 queue listener asynchronously") ;
+ return str ;
+}
+
+void TestAsynchQueue::onMessage(p<IMessage> message)
+{
+ if( message == NULL )
+ {
+ error = "Received a null message" ;
+ semaphore.notify() ;
+ return ;
+ }
+
+ p<IBytesMessage> msg = p_dyncast<IBytesMessage> (message) ;
+ if( msg == NULL )
+ {
+ error = "Received wrong type of message" ;
+ semaphore.notify() ;
+ return ;
+ }
+
+ // Verify message content
+ if( msg->readBoolean() != true ||
+ msg->readInt() != 3677490 ||
+ msg->readString()->compare("Hello Binary World!") != 0 )
+ {
+ error = "Message content has been corrupted" ;
+ }
+ semaphore.notify() ;
+}
Added: incubator/activemq/trunk/openwire-cpp/src/test/cpp/TestAsynchQueue.hpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/test/cpp/TestAsynchQueue.hpp?rev=406628&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/test/cpp/TestAsynchQueue.hpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/test/cpp/TestAsynchQueue.hpp Mon May 15 06:38:57 2006
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef TestAsynchQueue_hpp_
+#define TestAsynchQueue_hpp_
+
+#include <exception>
+#include <string>
+
+#include "cms/IConnection.hpp"
+#include "ppr/thread/Semaphore.hpp"
+#include "ppr/TraceException.hpp"
+#include "ppr/util/ifr/p"
+
+#include "IUnitTest.hpp"
+
+using namespace apache::cms;
+using namespace apache::ppr;
+using namespace apache::ppr::thread;
+using namespace ifr;
+using namespace std;
+
+/*
+ * Tests sending/receiving a message to an asynchronous listener.
+ */
+class TestAsynchQueue : public IUnitTest, public IMessageListener
+{
+private:
+ p<IConnection> connection ;
+ p<ISession> session ;
+ Semaphore semaphore ;
+ char* error ;
+
+public:
+ TestAsynchQueue(p<IConnection> connection) ;
+ virtual ~TestAsynchQueue() ;
+
+ virtual void setUp() throw (exception) ;
+ virtual void execute() throw (exception) ;
+ virtual void tearDown() throw (exception) ;
+ virtual p<string> toString() ;
+
+ virtual void onMessage(p<IMessage> message) ;
+} ;
+
+#endif /*TestAsynchQueue_hpp_*/
Added: incubator/activemq/trunk/openwire-cpp/src/test/cpp/TestAsynchTopic.cpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/test/cpp/TestAsynchTopic.cpp?rev=406628&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/test/cpp/TestAsynchTopic.cpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/test/cpp/TestAsynchTopic.cpp Mon May 15 06:38:57 2006
@@ -0,0 +1,125 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "TestAsynchTopic.hpp"
+
+/*
+ *
+ */
+TestAsynchTopic::TestAsynchTopic(p<IConnection> connection)
+{
+ this->connection = connection ;
+ this->error = NULL ;
+}
+
+/*
+ *
+ */
+TestAsynchTopic::~TestAsynchTopic()
+{
+ // no-op
+}
+
+/*
+ *
+ */
+void TestAsynchTopic::setUp() throw (exception)
+{
+ // Create a session
+ session = connection->createSession() ;
+}
+
+/*
+ *
+ */
+void TestAsynchTopic::execute() throw (exception)
+{
+ p<ITopic> topic ;
+ p<IMessageConsumer> consumer1,
+ consumer2 ;
+ p<IMessageProducer> producer ;
+ p<IMapMessage> message ;
+
+ // Connect to a topic
+ topic = session->getTopic("TEST.TOPIC") ;
+
+ // Create producer and two asynchrounous consumers
+ producer = session->createProducer(topic) ;
+ consumer1 = session->createConsumer(topic) ;
+ consumer1->setMessageListener( smartify(this) ) ;
+ consumer2 = session->createConsumer(topic) ;
+ consumer2->setMessageListener( smartify(this) ) ;
+
+ // Create binary message
+ message = session->createMapMessage() ;
+ message->setBoolean("key1", false) ;
+ message->setInt("key2", 8494845) ;
+ message->setString("key3", "Hello Map World!") ;
+
+ // Send message
+ producer->send(message) ;
+
+ // Wait for asynchronous messages for 5s each
+ semaphore.wait(5) ;
+ semaphore.wait(5) ;
+
+ // Check if any error was registered by the message handlers
+ if( error != NULL )
+ throw TraceException( error ) ;
+}
+
+/*
+ *
+ */
+void TestAsynchTopic::tearDown() throw (exception)
+{
+ // Clean up
+ session->close() ;
+ session = NULL ;
+}
+
+p<string> TestAsynchTopic::toString()
+{
+ p<string> str = new string("Send/receive a map message to 2 topic listener asynchronously") ;
+ return str ;
+}
+
+void TestAsynchTopic::onMessage(p<IMessage> message)
+{
+ if( message == NULL )
+ {
+ error = "Received a null message" ;
+ semaphore.notify() ;
+ return ;
+ }
+
+ p<IMapMessage> msg = p_dyncast<IMapMessage> (message) ;
+ if( msg == NULL )
+ {
+ error = "Received wrong type of message" ;
+ semaphore.notify() ;
+ return ;
+ }
+
+ // Verify message content
+ if( msg->getBoolean("key1") != false ||
+ msg->getInt("key2") != 8494845 ||
+ msg->getString("key3")->compare("Hello Map World!") != 0 )
+ {
+ error = "Message content has been corrupted" ;
+ }
+ semaphore.notify() ;
+}
Added: incubator/activemq/trunk/openwire-cpp/src/test/cpp/TestAsynchTopic.hpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/test/cpp/TestAsynchTopic.hpp?rev=406628&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/test/cpp/TestAsynchTopic.hpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/test/cpp/TestAsynchTopic.hpp Mon May 15 06:38:57 2006
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef TestAsynchTopic_hpp_
+#define TestAsynchTopic_hpp_
+
+#include <exception>
+#include <string>
+
+#include "cms/IConnection.hpp"
+#include "ppr/thread/Semaphore.hpp"
+#include "ppr/util/ifr/p"
+
+#include "ppr/TraceException.hpp"
+#include "IUnitTest.hpp"
+
+using namespace apache::cms;
+using namespace apache::ppr;
+using namespace apache::ppr::thread;
+using namespace ifr;
+using namespace std;
+
+/*
+ * Tests sending/receiving a message to two asynchronous listeners.
+ */
+class TestAsynchTopic : public IUnitTest, public IMessageListener
+{
+private:
+ p<IConnection> connection ;
+ p<ISession> session ;
+ Semaphore semaphore ;
+ char* error ;
+
+public:
+ TestAsynchTopic(p<IConnection> connection) ;
+ virtual ~TestAsynchTopic() ;
+
+ virtual void setUp() throw (exception) ;
+ virtual void execute() throw (exception) ;
+ virtual void tearDown() throw (exception) ;
+ virtual p<string> toString() ;
+
+ virtual void onMessage(p<IMessage> message) ;
+} ;
+
+#endif /*TestAsynchTopic_hpp_*/
Added: incubator/activemq/trunk/openwire-cpp/src/test/cpp/TestLocalTXCommit.cpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/test/cpp/TestLocalTXCommit.cpp?rev=406628&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/test/cpp/TestLocalTXCommit.cpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/test/cpp/TestLocalTXCommit.cpp Mon May 15 06:38:57 2006
@@ -0,0 +1,153 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "TestLocalTXCommit.hpp"
+
+/*
+ *
+ */
+TestLocalTXCommit::TestLocalTXCommit(p<IConnection> connection)
+{
+ this->connection = connection ;
+ this->error = NULL ;
+ this->transmitted = false ;
+ this->matchCount = 0 ;
+}
+
+/*
+ *
+ */
+TestLocalTXCommit::~TestLocalTXCommit()
+{
+ // no-op
+}
+
+/*
+ *
+ */
+void TestLocalTXCommit::setUp() throw (exception)
+{
+ // Create a session
+ session = connection->createSession( TransactionalAckMode ) ;
+}
+
+/*
+ *
+ */
+void TestLocalTXCommit::execute() throw (exception)
+{
+ p<IQueue> queue ;
+ p<IMessageConsumer> consumer ;
+ p<IMessageProducer> producer ;
+ p<ITextMessage> message1, message2;
+
+ // Connect to a queue
+ queue = session->getQueue("FOO.BAR") ;
+
+ // Create producer and a asycnhrounous consumer
+ producer = session->createProducer(queue) ;
+ consumer = session->createConsumer(queue) ;
+ consumer->setMessageListener( smartify(this) ) ;
+
+ // Create text messages
+ message1 = session->createTextMessage("LocalTX 1") ;
+ message2 = session->createTextMessage("LocalTX 2") ;
+
+ // Send messages
+ producer->send(message1) ;
+ producer->send(message2) ;
+
+ // Commit transaction
+ transmitted = true ;
+ session->commit() ;
+
+ // Wait for asynchronous receive for 5s
+ semaphore.wait(5) ;
+
+ // Check if any error was registered by the message handler
+ if( error != NULL )
+ throw TraceException( error ) ;
+}
+
+/*
+ *
+ */
+void TestLocalTXCommit::tearDown() throw (exception)
+{
+ // Clean up
+ session->close() ;
+ session = NULL ;
+}
+
+p<string> TestLocalTXCommit::toString()
+{
+ p<string> str = new string("Sends multiple messages to a queue guarded by a local transaction") ;
+ return str ;
+}
+
+void TestLocalTXCommit::onMessage(p<IMessage> message)
+{
+ if( !transmitted )
+ {
+ error = "Received a message before transaction was committed" ;
+ session->rollback() ;
+ semaphore.notify() ;
+ return ;
+ }
+
+ if( message == NULL )
+ {
+ error = "Received a null message" ;
+ session->rollback() ;
+ semaphore.notify() ;
+ return ;
+ }
+
+ p<ITextMessage> msg = p_dyncast<ITextMessage> (message) ;
+ if( msg == NULL )
+ {
+ error = "Received wrong type of message" ;
+ semaphore.notify() ;
+ return ;
+ }
+
+ // Verify message content
+ if( msg->getText()->compare("LocalTX 1") != 0 ||
+ msg->getText()->compare("LocalTX 2") != 0 )
+ {
+ matchCount++ ;
+ }
+ else
+ {
+ error = "Message content has been corrupted" ;
+
+ // Rollback receive
+ session->rollback() ;
+
+ // Wakeup main thread
+ semaphore.notify() ;
+ }
+
+ // Did we receive both messages?
+ if( matchCount == 2 )
+ {
+ // Commit receive
+ session->commit() ;
+
+ // Wakeup main thread
+ semaphore.notify() ;
+ }
+}
Added: incubator/activemq/trunk/openwire-cpp/src/test/cpp/TestLocalTXCommit.hpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/test/cpp/TestLocalTXCommit.hpp?rev=406628&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/test/cpp/TestLocalTXCommit.hpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/test/cpp/TestLocalTXCommit.hpp Mon May 15 06:38:57 2006
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef TestLocalTXCommit_hpp_
+#define TestLocalTXCommit_hpp_
+
+#include <exception>
+#include <string>
+
+#include "cms/IConnection.hpp"
+#include "activemq/AcknowledgementMode.hpp"
+#include "ppr/thread/Semaphore.hpp"
+#include "ppr/TraceException.hpp"
+#include "ppr/util/ifr/p"
+
+#include "IUnitTest.hpp"
+
+using namespace apache;
+using namespace apache::cms;
+using namespace apache::ppr;
+using namespace apache::ppr::thread;
+using namespace ifr;
+using namespace std;
+
+/*
+ * Tests sending multiple messages to a queue guarded by a local transaction.
+ */
+class TestLocalTXCommit : public IUnitTest, public IMessageListener
+{
+private:
+ p<IConnection> connection ;
+ p<ISession> session ;
+ Semaphore semaphore ;
+ char* error ;
+ bool transmitted ;
+ int matchCount ;
+
+public:
+ TestLocalTXCommit(p<IConnection> connection) ;
+ virtual ~TestLocalTXCommit() ;
+
+ virtual void setUp() throw (exception) ;
+ virtual void execute() throw (exception) ;
+ virtual void tearDown() throw (exception) ;
+ virtual p<string> toString() ;
+
+ virtual void onMessage(p<IMessage> message) ;
+} ;
+
+#endif /*TestLocalTXCommit_hpp_*/