You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by nm...@apache.org on 2006/05/15 15:39:33 UTC

svn commit: r406628 [9/10] - in /incubator/activemq/trunk/openwire-cpp: ./ src/gram/java/org/apache/activemq/openwire/tool/ src/gram/script/ src/main/cpp/activemq/ src/main/cpp/activemq/command/ src/main/cpp/activemq/protocol/ src/main/cpp/activemq/pro...

Modified: incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/transport/tcp/TcpTransport.hpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/transport/tcp/TcpTransport.hpp?rev=406628&r1=406627&r2=406628&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/transport/tcp/TcpTransport.hpp (original)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/activemq/transport/tcp/TcpTransport.hpp Mon May 15 06:38:57 2006
@@ -26,7 +26,6 @@
 #include <map>
 #include "cms/CmsException.hpp"
 #include "activemq/BrokerException.hpp"
-#include "activemq/ICommand.hpp"
 #include "activemq/command/BaseCommand.hpp"
 #include "activemq/command/Response.hpp"
 #include "activemq/command/ExceptionResponse.hpp"
@@ -35,6 +34,10 @@
 #include "activemq/transport/ITransport.hpp"
 #include "activemq/transport/ICommandListener.hpp"
 #include "ppr/InvalidOperationException.hpp"
+#include "ppr/io/DataInputStream.hpp"
+#include "ppr/io/DataOutputStream.hpp"
+#include "ppr/io/BufferedInputStream.hpp"
+#include "ppr/io/BufferedOutputStream.hpp"
 #include "ppr/io/SocketInputStream.hpp"
 #include "ppr/io/SocketOutputStream.hpp"
 #include "ppr/net/ISocket.hpp"
@@ -73,14 +76,14 @@
 class TcpTransport : public ITransport
 {
 private:
-    p<IProtocol>                 protocol ;
-    p<SocketInputStream>         reader ;
-    p<SocketOutputStream>        writer ;
-    p<ICommandListener>          listener ;
-    p<ReadThread>                readThread ;
-    p<ISocket>                   socket ;
-    bool                         closed,
-                                 started ;
+    p<IProtocol>        protocol ;
+    p<DataInputStream>  istream ;
+    p<DataOutputStream> ostream ;
+    p<ICommandListener> listener ;
+    p<ReadThread>       readThread ;
+    p<ISocket>          socket ;
+    bool                closed,
+                        started ;
 
 public:
     TcpTransport(p<ISocket> socket, p<IProtocol> wireProtocol) ;
@@ -90,9 +93,9 @@
     virtual p<ICommandListener> getCommandListener() ;
 
     virtual void start() ;
-    virtual void oneway(p<ICommand> command) ;
-    virtual p<FutureResponse> asyncRequest(p<ICommand> command) ;
-    virtual p<Response> request(p<ICommand> command) ;
+    virtual void oneway(p<BaseCommand> command) ;
+    virtual p<FutureResponse> asyncRequest(p<BaseCommand> command) ;
+    virtual p<Response> request(p<BaseCommand> command) ;
 
 public:
     void readLoop() ;

Modified: incubator/activemq/trunk/openwire-cpp/src/main/cpp/cms/IBytesMessage.hpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/cms/IBytesMessage.hpp?rev=406628&r1=406627&r2=406628&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/cms/IBytesMessage.hpp (original)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/cms/IBytesMessage.hpp Mon May 15 06:38:57 2006
@@ -51,7 +51,7 @@
     virtual int readInt() throw (MessageNotReadableException, MessageEOFException) = 0 ;
     virtual long long readLong() throw (MessageNotReadableException, MessageEOFException) = 0 ;
     virtual short readShort() throw (MessageNotReadableException, MessageEOFException) = 0 ;
-    virtual p<string> readUTF() throw (MessageNotReadableException, MessageEOFException) = 0 ;
+    virtual p<string> readString() throw (MessageNotReadableException, MessageEOFException) = 0 ;
     virtual void writeBoolean(bool value) throw (MessageNotWritableException) = 0 ;
     virtual void writeByte(char value) throw (MessageNotWritableException) = 0 ;
     virtual void writeBytes(char* value, int index, int length) throw (MessageNotWritableException) = 0 ;
@@ -60,7 +60,7 @@
     virtual void writeInt(int value) throw (MessageNotWritableException) = 0 ;
     virtual void writeLong(long long value) throw (MessageNotWritableException) = 0 ;
     virtual void writeShort(short value) throw (MessageNotWritableException) = 0 ;
-    virtual void writeUTF(const char* value) throw (MessageNotWritableException) = 0 ;
+    virtual void writeString(const char* value) throw (MessageNotWritableException) = 0 ;
 } ;
 
 /* namespace */

Modified: incubator/activemq/trunk/openwire-cpp/src/main/cpp/cms/IMapMessage.hpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/cms/IMapMessage.hpp?rev=406628&r1=406627&r2=406628&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/cms/IMapMessage.hpp (original)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/cms/IMapMessage.hpp Mon May 15 06:38:57 2006
@@ -23,6 +23,8 @@
 #endif
 
 #include "cms/IMessage.hpp"
+#include "cms/MessageFormatException.hpp"
+#include "ppr/IllegalArgumentException.hpp"
 #include "ppr/util/MapItemHolder.hpp"
 #include "ppr/util/ifr/p"
 
@@ -30,6 +32,7 @@
 {
   namespace cms
   {
+    using namespace apache::ppr;
     using namespace apache::ppr::util;
     using namespace ifr;
 
@@ -39,6 +42,24 @@
 struct IMapMessage : IMessage
 {
     virtual p<PropertyMap> getBody() = 0 ;
+    virtual bool getBoolean(const char* name) throw (MessageFormatException, IllegalArgumentException) = 0 ;
+    virtual void setBoolean(const char* name, bool value) throw (IllegalArgumentException) = 0 ;
+    virtual char getByte(const char* name) throw (MessageFormatException, IllegalArgumentException) = 0 ;
+    virtual void setByte(const char* name, char value) throw (IllegalArgumentException) = 0 ;
+    virtual array<char> getBytes(const char* name) throw (MessageFormatException, IllegalArgumentException) = 0 ;
+    virtual void setBytes(const char* name, array<char> value) throw (IllegalArgumentException) = 0 ;
+    virtual double getDouble(const char* name) throw (MessageFormatException, IllegalArgumentException) = 0 ;
+    virtual void setDouble(const char* name, double value) throw (IllegalArgumentException) = 0 ;
+    virtual float getFloat(const char* name) throw (MessageFormatException, IllegalArgumentException) = 0 ;
+    virtual void setFloat(const char* name, float value) throw (IllegalArgumentException) = 0 ;
+    virtual int getInt(const char* name) throw (MessageFormatException, IllegalArgumentException) = 0 ;
+    virtual void setInt(const char* name, int value) throw (IllegalArgumentException) = 0 ;
+    virtual long long getLong(const char* name) throw (MessageFormatException, IllegalArgumentException) = 0 ;
+    virtual void setLong(const char* name, long long value) throw (IllegalArgumentException) = 0 ;
+    virtual short getShort(const char* name) throw (MessageFormatException, IllegalArgumentException) = 0 ;
+    virtual void setShort(const char* name, short value) throw (IllegalArgumentException) = 0 ;
+    virtual p<string> getString(const char* name) throw (MessageFormatException, IllegalArgumentException) = 0 ;
+    virtual void setString(const char* name, const char* value) throw (IllegalArgumentException) = 0 ;
 } ;
 
 /* namespace */

Added: incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/BufferedInputStream.cpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/BufferedInputStream.cpp?rev=406628&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/BufferedInputStream.cpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/BufferedInputStream.cpp Mon May 15 06:38:57 2006
@@ -0,0 +1,132 @@
+/* 
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "ppr/io/BufferedInputStream.hpp"
+
+using namespace apache::ppr::io;
+
+/*
+ * 
+ */
+BufferedInputStream::BufferedInputStream(p<IInputStream> istream)
+{
+    this->istream = istream ;
+    this->size    = DEFAULT_SIZE ;
+	buffer        = new char[size] ;
+	position      = 0 ;
+    treshold      = 0 ;
+}
+
+/*
+ * 
+ */
+BufferedInputStream::BufferedInputStream(p<IInputStream> istream, int size)
+{
+    this->istream = istream ;
+    this->size    = size ;
+	buffer        = new char[size] ;
+	position      = 0 ;
+    treshold      = 0 ;
+}
+
+/*
+ * Close stream.
+ */
+void BufferedInputStream::close() throw(IOException)
+{
+    // Cascade close request to underlying stream
+    if( istream != NULL )
+    {
+        istream->close() ;
+        istream = NULL ;
+        buffer  = NULL ;
+    }
+}
+
+/*
+ *
+ */
+int BufferedInputStream::read(char* buf, int offset, int length) throw(IOException)
+{
+    // Check if underlying stream has been closed
+    checkClosed() ;
+
+    // Assert parameters
+    if( buf == NULL || offset < 0 || offset > length || length < 0 )
+        throw IllegalArgumentException() ;
+    
+    // Skip read if length is invalid
+    if( length == 0 )
+        return 0 ;
+
+    // Have we reached end-of-buffer?
+	if( isEOB() )
+    {
+        // Skip buffering should request be larger than internal buffer
+	    if( length >= size )
+		    return istream->read(buf, offset, length) ;
+
+        // Load internal buffer with new data
+	    loadBuffer() ;
+	}
+    // Any data available?
+	if( isEOB() )
+        return -1 ;
+
+    // Copy requested bytes up to max buffer size
+	int bytesRead = min(length, treshold - position) ;
+
+    // Copy read bytes into supplied buffer
+	memcpy(&buf[offset], &buffer[position], bytesRead) ;
+
+    // Adjust array position
+	position += bytesRead ;
+
+	return bytesRead ;
+}
+
+/*
+ * Load the input buffer with new data.
+ */
+void BufferedInputStream::loadBuffer() throw(IOException)
+{
+    // Try to load the whole buffer
+    int bytesRead = istream->read(buffer, 0, size) ;
+
+    // Reset counters if any data was loaded
+    if( bytesRead > 0 )
+    {
+        treshold = bytesRead ;
+        position = 0 ;
+    }
+}
+
+/*
+ * Check if stream has been closed.
+ */
+void BufferedInputStream::checkClosed() throw(IOException)
+{
+    if( istream == NULL )
+        throw IOException("Input stream closed") ;
+}
+
+/*
+ * Check is end-of-buffer has been reached.
+ */
+bool BufferedInputStream::isEOB()
+{
+    return ( position >= treshold ) ? true : false ;
+}

Added: incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/BufferedInputStream.hpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/BufferedInputStream.hpp?rev=406628&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/BufferedInputStream.hpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/BufferedInputStream.hpp Mon May 15 06:38:57 2006
@@ -0,0 +1,74 @@
+/* 
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef Ppr_BufferedInputStream_hpp_
+#define Ppr_BufferedInputStream_hpp_
+
+// Turn off warning message for ignored exception specification
+#ifdef _MSC_VER
+#pragma warning( disable : 4290 )
+#endif
+
+#include <stdlib.h>
+#include "ppr/IllegalArgumentException.hpp"
+#include "ppr/io/IInputStream.hpp"
+#include "ppr/io/IOException.hpp"
+#include "ppr/thread/SimpleMutex.hpp"
+#include "ppr/util/ifr/p"
+
+namespace apache
+{
+  namespace ppr
+  {
+    namespace io
+    {
+      using namespace ifr ;
+      using namespace apache::ppr ;
+      using namespace apache::ppr::thread ;
+
+/*
+ * Buffers bytes to provide more efficient reading from an
+ * input stream.
+ */
+class BufferedInputStream : public IInputStream
+{
+private:
+    p<IInputStream> istream ;
+    char*           buffer ;
+    int             size, position, treshold ;
+
+    // Default buffer size
+    static const int DEFAULT_SIZE = 10240 ;
+
+public:
+    BufferedInputStream(p<IInputStream> istream) ;
+    BufferedInputStream(p<IInputStream> istream, int size) ;
+
+    virtual void close() throw(IOException) ;
+    virtual int read(char* buf, int offset, int length) throw(IOException) ;
+
+private:
+    void checkClosed() throw(IOException) ;
+    void loadBuffer() throw(IOException) ;
+    bool isEOB() ;
+} ;
+
+/* namespace */
+    }
+  }
+}
+
+#endif /*Ppr_BufferedInputStream_hpp_*/

Added: incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/BufferedOutputStream.cpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/BufferedOutputStream.cpp?rev=406628&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/BufferedOutputStream.cpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/BufferedOutputStream.cpp Mon May 15 06:38:57 2006
@@ -0,0 +1,142 @@
+/* 
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "ppr/io/BufferedOutputStream.hpp"
+
+using namespace apache::ppr::io;
+
+/*
+ * 
+ */
+BufferedOutputStream::BufferedOutputStream(p<IOutputStream> ostream)
+{
+    this->ostream = ostream ;
+    this->size    = DEFAULT_SIZE ;
+	buffer        = new char[size] ;
+	position      = 0 ;
+    treshold      = size ;
+}
+
+/*
+ * 
+ */
+BufferedOutputStream::BufferedOutputStream(p<IOutputStream> ostream, int size)
+{
+    this->ostream = ostream ;
+    this->size    = size ;
+	buffer        = new char[size] ;
+	position      = 0 ;
+    treshold      = size ;
+}
+
+/*
+ * Close stream.
+ */
+void BufferedOutputStream::close() throw(IOException)
+{
+    // Cascade close request to internal stream
+    if( ostream != NULL )
+    {
+        // Flush any remaining bytes
+        flush0() ;
+
+        // Shut down
+        ostream->close() ;
+        ostream = NULL ;
+        buffer  = NULL ;
+    }
+}
+
+/*
+ *
+ */
+int BufferedOutputStream::write(const char* buf, int offset, int length) throw(IOException)
+{
+    // Check if underlying stream has been closed
+    checkClosed() ;
+
+    // Assert parameters
+    if( buf == NULL || offset < 0 || offset > length || length < 0 )
+        throw IllegalArgumentException() ;
+
+    // Skip write if length is invalid
+    if( length == 0 )
+        return 0 ;
+
+    // Skip buffering should request be larger than internal buffer
+    if( length >= size )
+    {
+        flush0() ;
+	    return ostream->write(buf, offset, length) ;
+    }
+    int start = offset, end = offset + length ;
+
+    while( start < end )
+    {
+        int delta = min(treshold - position, end - start) ;
+        memcpy(&buffer[position], &buf[start], delta) ;
+        start    += delta ;
+        position += delta ;
+
+        // Have we reached end-of-buffer?
+	    if( isEOB() )
+            flush0() ;
+    }
+    return length ;
+}
+
+/*
+ * Flush stream, i.e. buffer.
+ */
+void BufferedOutputStream::flush() throw(IOException)
+{
+    // Check if underlying stream has been closed
+    checkClosed() ;
+
+    flush0() ;
+}
+
+/*
+ * Flush buffer.
+ */
+void BufferedOutputStream::flush0() throw(IOException)
+{
+    // Check if there is anything to flush
+    if( position > 0 )
+    {
+        ostream->write(buffer, 0, position) ;
+	    position = 0 ;
+	}
+    // Flush underlying stream
+    ostream->flush() ;
+}
+
+/*
+ * Check if stream has been closed.
+ */
+void BufferedOutputStream::checkClosed() throw(IOException)
+{
+    if( ostream == NULL )
+        throw IOException("Output stream closed") ;
+}
+
+/*
+ * Check is end-of-buffer has been reached.
+ */
+bool BufferedOutputStream::isEOB()
+{
+    return ( position >= treshold ) ? true : false ;
+}

Added: incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/BufferedOutputStream.hpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/BufferedOutputStream.hpp?rev=406628&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/BufferedOutputStream.hpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/BufferedOutputStream.hpp Mon May 15 06:38:57 2006
@@ -0,0 +1,75 @@
+/* 
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef Ppr_BufferedOutputStream_hpp_
+#define Ppr_BufferedOutputStream_hpp_
+
+// Turn off warning message for ignored exception specification
+#ifdef _MSC_VER
+#pragma warning( disable : 4290 )
+#endif
+
+#include <stdlib.h>
+#include "ppr/IllegalArgumentException.hpp"
+#include "ppr/io/IOutputStream.hpp"
+#include "ppr/io/IOException.hpp"
+#include "ppr/thread/SimpleMutex.hpp"
+#include "ppr/util/ifr/p"
+
+namespace apache
+{
+  namespace ppr
+  {
+    namespace io
+    {
+      using namespace ifr ;
+      using namespace apache::ppr ;
+      using namespace apache::ppr::thread ;
+
+/*
+ * Buffers bytes to provide more efficient writing to an
+ * output stream.
+ */
+class BufferedOutputStream : public IOutputStream
+{
+private:
+    p<IOutputStream> ostream ;
+    char*            buffer ;
+    int              size, position, treshold ;
+
+    // Default buffer size
+    static const int DEFAULT_SIZE = 10240 ;
+
+public:
+    BufferedOutputStream(p<IOutputStream> ostream) ;
+    BufferedOutputStream(p<IOutputStream> ostream, int size) ;
+
+    virtual void close() throw(IOException) ;
+    virtual void flush() throw(IOException) ;
+    virtual int write(const char* buf, int offset, int length) throw(IOException) ;
+
+private:
+    void checkClosed() throw(IOException) ;
+    void flush0() throw(IOException) ;
+    bool isEOB() ;
+} ;
+
+/* namespace */
+    }
+  }
+}
+
+#endif /*Ppr_BufferedOutputStream_hpp_*/

Modified: incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/ByteArrayInputStream.hpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/ByteArrayInputStream.hpp?rev=406628&r1=406627&r2=406628&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/ByteArrayInputStream.hpp (original)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/ByteArrayInputStream.hpp Mon May 15 06:38:57 2006
@@ -17,7 +17,7 @@
 #ifndef Ppr_ByteArrayInputStream_hpp_
 #define Ppr_ByteArrayInputStream_hpp_
 
-#include "ppr/io/DataInputStream.hpp"
+#include "ppr/io/IInputStream.hpp"
 #include "ppr/io/EOFException.hpp"
 #include "ppr/util/ifr/array"
 #include "ppr/util/ifr/p"
@@ -34,7 +34,7 @@
  * ByteArrayInputStream reads primitive C++ data types from an
  * in-memory byte array.
  */
-class ByteArrayInputStream : public DataInputStream
+class ByteArrayInputStream : public IInputStream
 {
 private:
     array<char> body ;

Modified: incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/ByteArrayOutputStream.hpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/ByteArrayOutputStream.hpp?rev=406628&r1=406627&r2=406628&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/ByteArrayOutputStream.hpp (original)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/ByteArrayOutputStream.hpp Mon May 15 06:38:57 2006
@@ -17,7 +17,7 @@
 #ifndef Ppr_ByteArrayOutputStream_hpp_
 #define Ppr_ByteArrayOutputStream_hpp_
 
-#include "ppr/io/DataOutputStream.hpp"
+#include "ppr/io/IOutputStream.hpp"
 #include "ppr/util/ifr/array"
 #include "ppr/util/ifr/p"
 
@@ -33,7 +33,7 @@
  * ByteArrayOutputStream writes primitive C++ data types to a
  * in memory byte array.
  */
-class ByteArrayOutputStream : public DataOutputStream
+class ByteArrayOutputStream : public IOutputStream
 {
 private:
     array<char> body ;

Modified: incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/DataInputStream.cpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/DataInputStream.cpp?rev=406628&r1=406627&r2=406628&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/DataInputStream.cpp (original)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/DataInputStream.cpp Mon May 15 06:38:57 2006
@@ -21,9 +21,19 @@
 /*
  *
  */
-DataInputStream::DataInputStream()
+DataInputStream::DataInputStream(p<IInputStream> istream)
 {
-    // no-op
+    this->istream = istream ;
+    this->encoder = CharsetEncoderRegistry::getEncoder() ;
+}
+
+/*
+ *
+ */
+DataInputStream::DataInputStream(p<IInputStream> istream, const char* encname)
+{
+    this->istream = istream ;
+    this->encoder = CharsetEncoderRegistry::getEncoder(encname) ;
 }
 
 /*
@@ -37,12 +47,40 @@
 /*
  *
  */
+void DataInputStream::close() throw(IOException)
+{
+    // Cascade close request to underlying stream
+    if( istream != NULL )
+    {
+        istream->close() ;
+        istream = NULL ;
+    }
+}
+
+/*
+ *
+ */
+int DataInputStream::read(char* buffer, int offset, int length) throw(IOException)
+{
+    // Check if underlying stream has been closed
+    checkClosed() ;
+
+    // Read buffer
+    return istream->read(buffer, offset, length) ;
+}
+
+/*
+ *
+ */
 char DataInputStream::readByte() throw(IOException)
 {
     char value ;
 
+    // Check if underlying stream has been closed
+    checkClosed() ;
+
     // Read a single byte
-    read(&value, 0, sizeof(char)) ;
+    istream->read(&value, 0, sizeof(char)) ;
     return value ;
 }
 
@@ -62,8 +100,11 @@
 {
     double value ;
 
+    // Check if underlying stream has been closed
+    checkClosed() ;
+
     // Read a double and convert from big endian to little endian if necessary
-    read((char*)&value, 0, sizeof(double) ) ;
+    istream->read((char*)&value, 0, sizeof(double) ) ;
     return ntohd(value) ;
 }
 
@@ -74,8 +115,11 @@
 {
     float value ;
 
+    // Check if underlying stream has been closed
+    checkClosed() ;
+
     // Read a float and convert from big endian to little endian if necessary
-    read((char*)&value, 0, sizeof(float)) ;
+    istream->read((char*)&value, 0, sizeof(float)) ;
     return ntohf(value) ;
 }
 
@@ -86,8 +130,11 @@
 {
     short value ;
 
+    // Check if underlying stream has been closed
+    checkClosed() ;
+
     // Read a short and convert from big endian to little endian if necessary
-    read((char*)&value, 0, sizeof(short)) ;
+    istream->read((char*)&value, 0, sizeof(short)) ;
     return ntohs(value) ;
 }
 
@@ -98,8 +145,11 @@
 {
     int value ;
 
+    // Check if underlying stream has been closed
+    checkClosed() ;
+
     // Read an int and convert from big endian to little endian if necessary
-    read((char*)&value, 0, sizeof(int)) ;
+    istream->read((char*)&value, 0, sizeof(int)) ;
     return ntohi(value) ;
 }
 
@@ -110,8 +160,11 @@
 {
     long long value ;
 
+    // Check if underlying stream has been closed
+    checkClosed() ;
+
     // Read a long long and convert from big endian to little endian if necessary
-    read((char*)&value, 0, sizeof(long long)) ;
+    istream->read((char*)&value, 0, sizeof(long long)) ;
     return ntohll(value) ;
 }
 
@@ -121,26 +174,42 @@
 p<string> DataInputStream::readString() throw(IOException)
 {
     p<string> value ;
-    char*     buffer ;
     short     length ;
 
+    // Check if underlying stream has been closed
+    checkClosed() ;
+
     // Read length of string
     length = readShort() ;
-    if (length < 0) {
-        throw IOException ("Negative length of string");
-    } else if (length > 0) {
-        buffer = new char[length+1] ;
+    if (length < 0)
+        throw IOException ("Negative length of string") ;
+    else if (length > 0)
+    {
+        array<char> buffer = array<char> (length+1) ;
 
         // Read string bytes
-        read(buffer, 0, length) ;
-        *(buffer+length) = '\0' ;
+        istream->read(&buffer[0], 0, length) ;
+        buffer[length] = '\0' ;
 
-        // Create string class
+        // Create string
         value = new string() ;
-        value->assign(buffer) ;
+        value->assign( buffer.c_array() ) ;
 
-        return value ;
-    } else {
-        return new string ("");
+        // Decode string if charset encoder has been configured
+        if( encoder != NULL )
+            value = encoder->decode(value) ;
     }
+    else   // ...empty string
+        value = new string("") ;
+
+    return value ;
+}
+
+/*
+ * Check if stream has been closed.
+ */
+void DataInputStream::checkClosed() throw(IOException)
+{
+    if( istream == NULL )
+        throw IOException("Input stream closed") ;
 }

Modified: incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/DataInputStream.hpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/DataInputStream.hpp?rev=406628&r1=406627&r2=406628&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/DataInputStream.hpp (original)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/DataInputStream.hpp Mon May 15 06:38:57 2006
@@ -18,6 +18,8 @@
 #define Ppr_DataInputStream_hpp_
 
 #include "ppr/io/IInputStream.hpp"
+#include "ppr/io/encoding/ICharsetEncoder.hpp"
+#include "ppr/io/encoding/CharsetEncoderRegistry.hpp"
 #include "ppr/util/Endian.hpp"
 #include "ppr/util/ifr/p"
 
@@ -27,13 +29,14 @@
   {
     namespace io
     {
-      using namespace apache::ppr::util; //htonx and ntohx functions.
       using namespace ifr;
+      using namespace apache::ppr::util; //htonx and ntohx functions.
+      using namespace apache::ppr::io::encoding;
 
 /*
  * The DataInputStream class reads primitive C++ data types from an
  * underlying input stream in a Java compatible way. Strings are
- * read as raw bytes, no character decoding is performed.
+ * read as raw bytes or decoded should encoding have been configured.
  *
  * All numeric data types are assumed to be available in big
  * endian (network byte order) and are converted automatically
@@ -43,12 +46,17 @@
  */
 class DataInputStream : public IInputStream
 {
+private:
+    p<IInputStream>    istream ;
+    p<ICharsetEncoder> encoder ;
+
 public:
-    DataInputStream() ;
+    DataInputStream(p<IInputStream> istream) ;
+    DataInputStream(p<IInputStream> istream, const char* encname) ;
     virtual ~DataInputStream() ;
 
-    virtual void close() throw(IOException) = 0 ;
-    virtual int read(char* buffer, int index, int count) throw(IOException) = 0 ;
+    virtual void close() throw(IOException) ;
+    virtual int read(char* buffer, int offset, int length) throw(IOException) ;
     virtual char readByte() throw(IOException) ;
     virtual bool readBoolean() throw(IOException) ;
     virtual double readDouble() throw(IOException) ;
@@ -57,6 +65,9 @@
     virtual int readInt() throw(IOException) ;
     virtual long long readLong() throw(IOException) ;
     virtual p<string> readString() throw(IOException) ;
+
+protected:
+    void checkClosed() throw(IOException) ;
 } ;
 
 /* namespace */

Modified: incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/DataOutputStream.cpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/DataOutputStream.cpp?rev=406628&r1=406627&r2=406628&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/DataOutputStream.cpp (original)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/DataOutputStream.cpp Mon May 15 06:38:57 2006
@@ -18,65 +18,194 @@
 
 using namespace apache::ppr::io;
 
+/*
+ *
+ */
+DataOutputStream::DataOutputStream(p<IOutputStream> ostream)
+{
+    this->ostream = ostream ;
+    this->encoder = CharsetEncoderRegistry::getEncoder() ;
+}
+
+/*
+ *
+ */
+DataOutputStream::DataOutputStream(p<IOutputStream> ostream, const char* encname)
+{
+    this->ostream = ostream ;
+    this->encoder = CharsetEncoderRegistry::getEncoder(encname) ;
+}
+
+/*
+ *
+ */
+DataOutputStream::~DataOutputStream()
+{
+    // no-op
+}
+
+/*
+ *
+ */
+void DataOutputStream::close() throw(IOException)
+{
+    // Cascade close request to underlying stream
+    if( ostream != NULL )
+    {
+        ostream->close() ;
+        ostream = NULL ;
+    }
+}
+
+/*
+ *
+ */
+void DataOutputStream::flush() throw(IOException)
+{
+    // Check if underlying stream has been closed
+    checkClosed() ;
+
+    // Flush stream
+    ostream->flush() ;
+}
+
+/*
+ *
+ */
+int DataOutputStream::write(const char* buffer, int offset, int length) throw(IOException)
+{
+    // Check if underlying stream has been closed
+    checkClosed() ;
+
+    // Write buffer
+    return ostream->write(buffer, offset, length) ;
+}
+
+/*
+ *
+ */
 void DataOutputStream::writeByte(char value) throw(IOException)
 {
+    // Check if underlying stream has been closed
+    checkClosed() ;
+
     // Write a single byte
-    write((char*)&value, 0, sizeof(char)) ;
+    ostream->write((char*)&value, 0, sizeof(char)) ;
 }
 
+/*
+ *
+ */
 void DataOutputStream::writeBoolean(bool value) throw(IOException)
 {
     // Write a boolean
     ( value ) ? writeByte(0x01) : writeByte(0x00) ;
 }
 
+/*
+ *
+ */
 void DataOutputStream::writeDouble(double v) throw(IOException)
 {
+    // Check if underlying stream has been closed
+    checkClosed() ;
+
     // Write a double, convert from little endian to big endian if necessary
     double value = htond(v) ;
-    write((char*)&value, 0, sizeof(double)) ;
+    ostream->write((char*)&value, 0, sizeof(double)) ;
 }
 
+/*
+ *
+ */
 void DataOutputStream::writeFloat(float v) throw(IOException)
 {
+    // Check if underlying stream has been closed
+    checkClosed() ;
+
     // Write a float, convert from little endian to big endian if necessary
     float value = htonf(v) ;
-    write((char*)&value, 0, sizeof(float)) ;
+    ostream->write((char*)&value, 0, sizeof(float)) ;
 }
 
+/*
+ *
+ */
 void DataOutputStream::writeShort(short v) throw(IOException)
 {
+    // Check if underlying stream has been closed
+    checkClosed() ;
+
     // Write a short, convert from little endian to big endian if necessary
     short value = htons(v) ;
-    write((char*)&value, 0, sizeof(short)) ;
+    ostream->write((char*)&value, 0, sizeof(short)) ;
 }
 
+/*
+ *
+ */
 void DataOutputStream::writeInt(int v) throw(IOException)
 {
+    // Check if underlying stream has been closed
+    checkClosed() ;
+
     // Write an int, convert from little endian to big endian if necessary
     int value = htoni(v) ;
-    write((char*)&value, 0, sizeof(int)) ;
+    ostream->write((char*)&value, 0, sizeof(int)) ;
 }
 
+/*
+ *
+ */
 void DataOutputStream::writeLong(long long v) throw(IOException)
 {
+    // Check if underlying stream has been closed
+    checkClosed() ;
+
     // Write a long long, convert from little endian to big endian if necessary
     long long value = htonll(v) ;
-    write((char*)&value, 0, sizeof(long long)) ;
+    ostream->write((char*)&value, 0, sizeof(long long)) ;
 }
 
-void DataOutputStream::writeString(p<string> value) throw(IOException)
+/*
+ *
+ */
+int DataOutputStream::writeString(p<string> value) throw(IOException)
 {
+    // Check if underlying stream has been closed
+    checkClosed() ;
+
     // Assert argument
     if( value == NULL )
-        return ;
-    if( value->length() > USHRT_MAX )
+        return 0 ;
+
+    p<string> data ;
+    int       length = (int)value->length() ;
+
+    // Encode string if an charset encoder has been configured
+    if( encoder != NULL )
+        data = encoder->encode(value, &length) ;
+    else
+        data = value ;
+
+    // Assert string length
+    if( length > USHRT_MAX )
         throw IOException("String length exceeds maximum length") ;
 
     // Write length of string
-    short length = (short)value->length() ;
-    writeShort( length ) ;
+    writeShort( (short)length ) ;
 
     // Write string contents
-    write((char*)value->c_str(), 0, length) ;
+    ostream->write((char*)data->c_str(), 0, length) ;
+
+    return length ;
+}
+
+/*
+ * Check if stream has been closed.
+ */
+void DataOutputStream::checkClosed() throw(IOException)
+{
+    if( ostream == NULL )
+        throw IOException("Output stream closed") ;
 }

Modified: incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/DataOutputStream.hpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/DataOutputStream.hpp?rev=406628&r1=406627&r2=406628&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/DataOutputStream.hpp (original)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/DataOutputStream.hpp Mon May 15 06:38:57 2006
@@ -18,6 +18,8 @@
 #define Ppr_DataOutputStream_hpp_
 
 #include "ppr/io/IOutputStream.hpp"
+#include "ppr/io/encoding/ICharsetEncoder.hpp"
+#include "ppr/io/encoding/CharsetEncoderRegistry.hpp"
 #include "ppr/util/Endian.hpp"
 #include "ppr/util/ifr/p"
 
@@ -27,13 +29,15 @@
   {
     namespace io
     {
-      using namespace apache::ppr::util; // htonx and ntohx functions.
       using namespace ifr;
+      using namespace apache::ppr::util; // htonx and ntohx functions.
+      using namespace apache::ppr::io::encoding;
 
 /*
  * The DataOutputStream class writes primitive C++ data types to an
  * underlying output stream in a Java compatible way. Strings
- * are written as raw bytes, no character encoding is performed.
+ * are written as either raw bytes or encoded should encoding
+ * have been configured.
  *
  * All numeric data types are written in big endian (network byte
  * order) and if the platform is little endian they are converted
@@ -43,10 +47,18 @@
  */
 class DataOutputStream : public IOutputStream
 {
+private:
+    p<IOutputStream>   ostream ;
+    p<ICharsetEncoder> encoder ;
+
 public:
-    virtual void close() throw(IOException) = 0 ;
-    virtual void flush() throw(IOException) = 0 ;
-    virtual int write(const char* buffer, int index, int count) throw(IOException) = 0 ;
+    DataOutputStream(p<IOutputStream> ostream) ;
+    DataOutputStream(p<IOutputStream> ostream, const char* encname) ;
+    virtual ~DataOutputStream() ;
+
+    virtual void close() throw(IOException) ;
+    virtual void flush() throw(IOException) ;
+    virtual int write(const char* buffer, int offset, int length) throw(IOException) ;
     virtual void writeByte(char v) throw(IOException) ;
     virtual void writeBoolean(bool v) throw(IOException) ;
     virtual void writeDouble(double v) throw(IOException) ;
@@ -54,7 +66,10 @@
     virtual void writeShort(short v) throw(IOException) ;
     virtual void writeInt(int v) throw(IOException) ;
     virtual void writeLong(long long v) throw(IOException) ;
-    virtual void writeString(p<string> v) throw(IOException) ;
+    virtual int writeString(p<string> v) throw(IOException) ;
+
+protected:
+    void checkClosed() throw(IOException) ;
 } ;
 
 /* namespace */

Modified: incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/IInputStream.hpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/IInputStream.hpp?rev=406628&r1=406627&r2=406628&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/IInputStream.hpp (original)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/IInputStream.hpp Mon May 15 06:38:57 2006
@@ -45,14 +45,6 @@
 {
     virtual void close() throw(IOException) = 0 ;
     virtual int read(char* buffer, int index, int count) throw(IOException) = 0 ;
-    virtual char readByte() throw(IOException) = 0 ;
-    virtual bool readBoolean() throw(IOException) = 0 ;
-    virtual double readDouble() throw(IOException) = 0 ;
-    virtual float readFloat() throw(IOException) = 0 ;
-    virtual short readShort() throw(IOException) = 0 ;
-    virtual int readInt() throw(IOException) = 0 ;
-    virtual long long readLong() throw(IOException) = 0 ;
-    virtual p<string> readString() throw(IOException) = 0 ;
 } ;
 
 /* namespace */

Modified: incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/IOutputStream.hpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/IOutputStream.hpp?rev=406628&r1=406627&r2=406628&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/IOutputStream.hpp (original)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/IOutputStream.hpp Mon May 15 06:38:57 2006
@@ -46,15 +46,7 @@
 {
     virtual void close() throw(IOException) = 0 ;
     virtual void flush() throw(IOException) = 0 ;
-    virtual int write(const char* buffer, int index, int count) throw(IOException) = 0 ;
-    virtual void writeByte(char v) throw(IOException) = 0 ;
-    virtual void writeBoolean(bool v) throw(IOException) = 0 ;
-    virtual void writeDouble(double v) throw(IOException) = 0 ;
-    virtual void writeFloat(float v) throw(IOException) = 0 ;
-    virtual void writeShort(short v) throw(IOException) = 0 ;
-    virtual void writeInt(int v) throw(IOException) = 0 ;
-    virtual void writeLong(long long v) throw(IOException) = 0 ;
-    virtual void writeString(p<string> v) throw(IOException) = 0 ;
+    virtual int write(const char* buffer, int offset, int length) throw(IOException) = 0 ;
 } ;
 
 /* namespace */

Modified: incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/SocketInputStream.cpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/SocketInputStream.cpp?rev=406628&r1=406627&r2=406628&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/SocketInputStream.cpp (original)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/SocketInputStream.cpp Mon May 15 06:38:57 2006
@@ -18,7 +18,18 @@
 
 using namespace apache::ppr::io;
 
-void SocketInputStream::close() throw(IOException)
+/*
+ *
+ */
+SocketInputStream::SocketInputStream(p<ISocket> socket)
+{
+    this->socket = socket ;
+}
+
+/*
+ *
+ */
+SocketInputStream::~SocketInputStream()
 {
     // no-op
 }
@@ -26,21 +37,37 @@
 /*
  *
  */
-int SocketInputStream::read(char* buf, int index, int size) throw(IOException)
+void SocketInputStream::close() throw(IOException)
 {
-    char* buffer = buf + index ;
-    int   length, remaining = size ;
+    // Cascade close request to underlying socket
+    if( socket != NULL )
+    {
+        socket->close() ;
+        socket = NULL ;
+    }
+}
+
+/*
+ *
+ */
+int SocketInputStream::read(char* buf, int offset, int length) throw(IOException)
+{
+    char* buffer = buf + offset ;
+    //char* buffer = buf + index ;
+    //int   length, remaining = size ;
+    int bytesRead ;
 
     // Loop until requested number of bytes are read
-    while( remaining > 0 )
-    {
+    //while( remaining > 0 )
+    //{
         // Try to read remaining bytes
-        length = remaining ;
+      //  length = remaining ;
 
         try
         {
             // Read some bytes from socket
-            length = socket->receive(buffer, length) ;
+            //length = socket->receive(buffer, length) ;
+            bytesRead = socket->receive(buffer, length) ;
         }
         catch( SocketException se )
         {
@@ -49,8 +76,9 @@
         }
 
         // Adjust buffer pointer and remaining number of bytes
-        buffer    += length ;
-        remaining -= length ;
-	}
-	return size ;
+        //buffer    += length ;
+        //remaining -= length ;
+	//}
+	//return size ;
+    return bytesRead ;
 }

Modified: incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/SocketInputStream.hpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/SocketInputStream.hpp?rev=406628&r1=406627&r2=406628&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/SocketInputStream.hpp (original)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/SocketInputStream.hpp Mon May 15 06:38:57 2006
@@ -17,7 +17,7 @@
 #ifndef Ppr_SocketInputStream_hpp_
 #define Ppr_SocketInputStream_hpp_
 
-#include "ppr/io/DataInputStream.hpp"
+#include "ppr/io/IInputStream.hpp"
 #include "ppr/net/ISocket.hpp"
 #include "ppr/net/SocketException.hpp"
 #include "ppr/util/ifr/p"
@@ -30,22 +30,23 @@
     {
       using namespace ifr ;
       using namespace apache::ppr::net ;
-      using namespace apache::ppr::util ;
 
 /*
  * SocketInputStream reads primitive C++ data types from a
  * socket stream. It currently uses PPR sockets for
  * platform independency.
  */
-class SocketInputStream : public DataInputStream
+class SocketInputStream : public IInputStream
 {
 private:
     p<ISocket> socket ;
+
 public:
-    SocketInputStream(p<ISocket> socket) : socket (socket) {}
-public:
+    SocketInputStream(p<ISocket> socket) ;
+    virtual ~SocketInputStream() ;
+
     virtual void close() throw(IOException) ;
-    virtual int read(char* buffer, int index, int size) throw(IOException) ;
+    virtual int read(char* buffer, int offset, int length) throw(IOException) ;
 } ;
 
 /* namespace */

Modified: incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/SocketOutputStream.cpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/SocketOutputStream.cpp?rev=406628&r1=406627&r2=406628&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/SocketOutputStream.cpp (original)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/SocketOutputStream.cpp Mon May 15 06:38:57 2006
@@ -34,11 +34,22 @@
     // no-op
 }
 
+/*
+ *
+ */
 void SocketOutputStream::close() throw(IOException)
 {
-    // no-op
+    // Cascade close request to underlying socket
+    if( socket != NULL )
+    {
+        socket->close() ;
+        socket = NULL ;
+    }
 }
 
+/*
+ *
+ */
 void SocketOutputStream::flush() throw(IOException)
 {
     // no-op
@@ -47,21 +58,24 @@
 /*
  *
  */
-int SocketOutputStream::write(const char* buf, int index, int size) throw(IOException)
+int SocketOutputStream::write(const char* buf, int offset, int length) throw(IOException)
 {
-    const char*  buffer = buf + index ;
-    int          length, remaining = size ;
+    const char*  buffer = buf + offset ;
+    //const char*  buffer = buf + index ;
+    //int          length, remaining = size ;
+    int bytesWritten ;
 
     // Loop until requested number of bytes are read
-    while( remaining > 0 )
-    {
+    //while( remaining > 0 )
+    //{
         // Try to write remaining bytes
-        length = remaining ;
+        //length = remaining ;
 
         try
         {
             // Write some bytes to socket
-            length = socket->send(buffer, length) ;
+            //length = socket->send(buffer, length) ;
+            bytesWritten = socket->send(buffer, length) ;
         }
         catch( SocketException se )
         {
@@ -70,8 +84,9 @@
         }
 
         // Adjust buffer pointer and remaining number of bytes
-        buffer    += length ;
-        remaining -= length ;
-	}
-	return size ;
+        //buffer    += length ;
+        //remaining -= length ;
+	//}
+	//return size ;
+    return bytesWritten ;
 }

Modified: incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/SocketOutputStream.hpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/SocketOutputStream.hpp?rev=406628&r1=406627&r2=406628&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/SocketOutputStream.hpp (original)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/SocketOutputStream.hpp Mon May 15 06:38:57 2006
@@ -17,7 +17,7 @@
 #ifndef Ppr_SocketOutputStream_hpp_
 #define Ppr_SocketOutputStream_hpp_
 
-#include "ppr/io/DataOutputStream.hpp"
+#include "ppr/io/IOutputStream.hpp"
 #include "ppr/net/ISocket.hpp"
 #include "ppr/net/SocketException.hpp"
 #include "ppr/util/ifr/p"
@@ -30,14 +30,13 @@
     {
       using namespace ifr ;
       using namespace apache::ppr::net ;
-      using namespace apache::ppr::util ;
 
 /*
  * SocketOutputStream writes primitive C++ data types to a
  * socket stream. It currently uses PPR sockets for
  * platform independency.
  */
-class SocketOutputStream : public DataOutputStream
+class SocketOutputStream : public IOutputStream
 {
 private:
     p<ISocket> socket ;
@@ -48,7 +47,7 @@
 
     virtual void close() throw(IOException) ;
     virtual void flush() throw(IOException) ;
-    virtual int write(const char* buffer, int index, int size) throw(IOException) ;
+    virtual int write(const char* buffer, int offset, int length) throw(IOException) ;
 } ;
 
 /* namespace */

Added: incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/encoding/AsciiToUTF8Encoder.cpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/encoding/AsciiToUTF8Encoder.cpp?rev=406628&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/encoding/AsciiToUTF8Encoder.cpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/encoding/AsciiToUTF8Encoder.cpp Mon May 15 06:38:57 2006
@@ -0,0 +1,187 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "ppr/io/encoding/AsciiToUTF8Encoder.hpp"
+
+using namespace apache::ppr::io::encoding;
+
+
+// Init static members
+const char* AsciiToUTF8Encoder::NAME = "AsciiToUTF8" ;
+
+/*
+ *
+ */
+AsciiToUTF8Encoder::AsciiToUTF8Encoder()
+{
+    // no-op
+}
+
+/*
+ *
+ */
+AsciiToUTF8Encoder::~AsciiToUTF8Encoder()
+{
+    // no-op
+}
+
+/*
+ * Counts length of encoded version of given string.
+ */
+int AsciiToUTF8Encoder::length(p<string> str)
+{
+    // Assert parameter
+    if( str == NULL )
+    {
+        // Nothing to encode
+        return 0 ;
+    }
+
+    int  ch,
+         utflen = 0,
+         strlen = (int)str->length() ;
+
+    for( int i = 0 ; i < strlen ; i++ )
+    {
+        ch = ((*str)[i] & 0xFF) ;
+
+        // Single byte in range 0x0001 - 0x007F
+        if( (ch >= 0x0001) && (ch <= 0x007F) )
+            utflen++ ;
+
+        // Triple bytes above 0x07FF (should never occur, ASCII 0x00 - 0xFF)
+        else if( ch > 0x07FF )
+            utflen += 3 ;
+
+        // Double bytes for 0x0000 and in range 0x0080 - 0x07FF
+        else
+            utflen += 2 ;
+    }
+    return utflen ;
+}
+
+/*
+ * Encodes given string from ASCII into modified UTF-8.
+ */
+p<string> AsciiToUTF8Encoder::encode(p<string> str, int *enclen)
+{
+    // Assert parameter
+    if( str == NULL )
+    {
+        // Nothing to encode
+        *enclen = 0 ;
+        return NULL ;
+    }
+
+    p<string> encstr = new string("") ;
+    int  ch, strlen = (int)str->length() ;
+
+    // Init encoded length
+    *enclen = 0 ;
+
+    // Loop through string and encode each char
+    for( int i = 0 ; i < strlen ; i++ )
+    {
+        ch = ((*str)[i] & 0xFF) ;
+
+        // Single byte in range 0x0001 - 0x007F
+        if( (ch >= 0x0001) && (ch <= 0x007F) )
+        {
+            encstr->append(1, (char)ch) ;
+            (*enclen)++ ;
+        }
+        // Triple bytes above 0x07FF (should never occur, ASCII 0x00 - 0xFF)
+        else if( ch > 0x07FF )
+        {
+            encstr->append(1, (char)( ((ch >> 12) & 0x0F) | 0xE0 )) ;
+            encstr->append(1, (char)( ((ch >> 6) & 0x3F) | 0x80 )) ;
+            encstr->append(1, (char)( ((ch >> 0) & 0x3F) | 0x80 )) ;
+            *enclen += 3 ;
+        }
+        // Double bytes for 0x0000 and in range 0x0080 - 0x07FF
+        else
+        {
+            encstr->append(1, (char)( ((ch >> 6) & 0x1F) | 0xC0 )) ;
+            encstr->append(1, (char)( ((ch >> 0) & 0x3F) | 0x80 )) ;
+            *enclen += 2 ;
+        }
+    }
+    return encstr ;
+}
+
+/*
+ * Decodes given string from modified UTF-8 into ASCII.
+ */
+p<string> AsciiToUTF8Encoder::decode(p<string> str)
+{
+    // Assert argument
+    if( str == NULL || str->length() == 0 )
+        return NULL ;
+
+    p<string>     decstr = new string("") ;
+    int           length = (int)str->length() ;
+    unsigned char ch, ch2, ch3;
+    int           i = 0 ;
+
+    // Loop through and decode each char
+    while( i < length )
+    {
+        ch = ((*str)[i] & 0xFF) ;
+
+        switch( ch >> 4 )
+        {
+            case 0:
+            case 1:
+            case 2:
+            case 3:
+            case 4:
+            case 5:
+            case 6:
+            case 7:   // Single byte char, 0xxxxxxx
+                i++ ;
+                decstr->append( 1, (char)ch ) ;
+                break ;
+            case 12:
+            case 13:  // Double bytes char, 110xxxxx 10xxxxxx
+                i += 2 ;
+
+                if( i > length )
+                    throw exception() ;
+
+                ch2 = (*str)[i - 1] ;
+                if( (ch2 & 0xC0) != 0x80 )
+                    throw exception() ;
+
+                decstr->append( 1, (char)(((ch & 0x1F) << 6) | (ch2 & 0x3F)) ) ;
+                break ;
+            case 14:  // Triple bytes char, 1110xxxx 10xxxxxx 10xxxxxx
+                i += 3 ;
+                if( i > length )
+                    throw exception() ;
+
+                ch2 = (*str)[i - 2] ;
+                ch3 = (*str)[i - 1] ;
+                if( ((ch2 & 0xC0) != 0x80) || ((ch3 & 0xC0) != 0x80) )
+                    throw exception();
+
+                decstr->append( 1, (char)(((ch & 0x0F) << 12) | ((ch2 & 0x3F) << 6) | ((ch3 & 0x3F) << 0)) ) ;
+                break ;
+            default:  // Unsupported, 10xxxxxx 1111xxxx
+                throw exception() ;
+        }
+    }
+    return decstr ;
+}

Added: incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/encoding/AsciiToUTF8Encoder.hpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/encoding/AsciiToUTF8Encoder.hpp?rev=406628&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/encoding/AsciiToUTF8Encoder.hpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/encoding/AsciiToUTF8Encoder.hpp Mon May 15 06:38:57 2006
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef Ppr_AsciiToUTF8Encoder_hpp_
+#define Ppr_AsciiToUTF8Encoder_hpp_
+
+#include <string>
+#include <ppr/io/ByteArrayOutputStream.hpp>
+#include <ppr/io/encoding/ICharsetEncoder.hpp>
+#include <ppr/util/ifr/array>
+#include <ppr/util/ifr/p>
+
+namespace apache
+{
+  namespace ppr
+  {
+    namespace io
+    {
+      namespace encoding
+      {
+        using namespace std;
+        using namespace ifr;
+        using namespace ppr::io ;
+
+/*
+ * Character encoder for extended ASCII to modified UTF-8 encoding.
+ */
+class AsciiToUTF8Encoder : public ICharsetEncoder
+{
+private:
+
+public:
+    static const char* NAME ;
+
+public:
+    AsciiToUTF8Encoder() ;
+    virtual ~AsciiToUTF8Encoder() ;
+
+    virtual int length(p<string> str) ;
+    virtual p<string> encode(p<string> str, int *enclen) ;
+    virtual p<string> decode(p<string> str) ;
+} ;
+
+/* namespace */
+      }
+    }
+  }
+}
+
+#endif /*Ppr_AsciiToUTF8Encoder_hpp_*/

Added: incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/encoding/CharsetEncoderRegistry.cpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/encoding/CharsetEncoderRegistry.cpp?rev=406628&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/encoding/CharsetEncoderRegistry.cpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/encoding/CharsetEncoderRegistry.cpp Mon May 15 06:38:57 2006
@@ -0,0 +1,121 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "ppr/io/encoding/CharsetEncoderRegistry.hpp"
+#include <cctype>
+#include <algorithm>
+
+using namespace apache::ppr::io::encoding;
+
+// Init static members
+map<string, p<ICharsetEncoder> > CharsetEncoderRegistry::encoders ;
+const char* CharsetEncoderRegistry::DEFAULT = AsciiToUTF8Encoder::NAME ;
+
+// Init the default set of encoders
+static CharsetEncoderRegistry::MapInitializer initializer ;
+
+
+// --- Constructors -------------------------------------------------
+
+/*
+ *
+ */
+CharsetEncoderRegistry::CharsetEncoderRegistry()
+{
+    // no-op
+}
+
+/*
+ *
+ */
+CharsetEncoderRegistry::~CharsetEncoderRegistry()
+{
+    // no-op
+}
+
+
+// --- Attribute methods --------------------------------------------
+
+/*
+ *
+ */
+p<ICharsetEncoder> CharsetEncoderRegistry::getEncoder()
+{
+    return getEncoder( CharsetEncoderRegistry::DEFAULT ) ;
+}
+
+/*
+ *
+ */
+p<ICharsetEncoder> CharsetEncoderRegistry::getEncoder(const char* name)
+{
+    // Assert argument
+    if( name == NULL )
+        return NULL ;
+
+    map<string, p<ICharsetEncoder> >::iterator tempIter ;
+    string key = string(name) ;
+
+    // Make key string all lower case
+    std::transform(key.begin(), key.end(), key.begin(), (int(*)(int))tolower) ;  // The explicit cast is needed to compile on Linux
+
+    // Check if key exists in map
+    tempIter = encoders.find( key ) ;
+    if( tempIter != encoders.end() )
+        return tempIter->second ;
+    else   // Not found
+        return NULL ;
+}
+
+
+// --- Operation methods --------------------------------------------
+
+/*
+ *
+ */
+void CharsetEncoderRegistry::addEncoder(const char* name, p<ICharsetEncoder> encoder) throw(IllegalArgumentException)
+{
+    // Assert arguments
+    if( name == NULL || encoder == NULL )
+        throw IllegalArgumentException("Name and/or encoder cannot be NULL") ;
+
+    // Make key string all lower case
+    string key = string(name) ;
+    std::transform(key.begin(), key.end(), key.begin(), (int(*)(int))tolower) ;  // The explicit cast is needed to compile on Linux
+
+    encoders[key] = encoder ;
+}
+
+/*
+ *
+ */
+void CharsetEncoderRegistry::removeEncoder(const char* name) throw(IllegalArgumentException)
+{
+    // Assert argument
+    if( name == NULL )
+        throw IllegalArgumentException("Name cannot be NULL") ;
+
+    map<string, p<ICharsetEncoder> >::iterator tempIter ;
+    string key = string(name) ;
+
+    // Make key string all lower case
+    std::transform(key.begin(), key.end(), key.begin(), (int(*)(int))tolower) ;  // The explicit cast is needed to compile on Linux
+
+    // Check if key exists in map
+    tempIter = encoders.find( key ) ;
+    if( tempIter != encoders.end() )
+        encoders.erase( tempIter ) ;
+}

Added: incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/encoding/CharsetEncoderRegistry.hpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/encoding/CharsetEncoderRegistry.hpp?rev=406628&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/encoding/CharsetEncoderRegistry.hpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/encoding/CharsetEncoderRegistry.hpp Mon May 15 06:38:57 2006
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef Ppr_CharsetEncoderRegistry_hpp_
+#define Ppr_CharsetEncoderRegistry_hpp_
+
+// Turn off warning message for ignored exception specification
+#ifdef _MSC_VER
+#pragma warning( disable : 4290 )
+#endif
+
+#include <string>
+#include <map>
+#include "ppr/io/encoding/ICharsetEncoder.hpp"
+#include "ppr/IllegalArgumentException.hpp"
+#include "ppr/io/encoding/AsciiToUTF8Encoder.hpp"
+#include "ppr/util/ifr/p"
+
+namespace apache
+{
+  namespace ppr
+  {
+    namespace io
+    {
+      namespace encoding
+      {
+        using namespace ifr ;
+        using namespace std ;
+        using namespace apache::ppr ;
+
+/*
+ * The character set encoder registry maintains all available
+ * encoders/decoders.
+ */
+class CharsetEncoderRegistry
+{
+private:
+    static map<string, p<ICharsetEncoder> > encoders ;
+
+public:
+    // Name of the default encoder
+    static const char* DEFAULT ;
+
+protected:
+    CharsetEncoderRegistry() ;
+
+public:
+    virtual ~CharsetEncoderRegistry() ;
+
+	static p<ICharsetEncoder> getEncoder() ;
+	static p<ICharsetEncoder> getEncoder(const char* name) ;
+
+	static void addEncoder(const char* name, p<ICharsetEncoder> encoder) throw(IllegalArgumentException) ;
+	static void removeEncoder(const char* name) throw(IllegalArgumentException) ;
+
+    class MapInitializer
+    {
+    public:
+        MapInitializer() 
+        {
+            // Add the default set of encoders
+            CharsetEncoderRegistry::addEncoder(AsciiToUTF8Encoder::NAME, new AsciiToUTF8Encoder() ) ;
+        }
+    } ;
+
+    friend class CharsetEncoderRegistry::MapInitializer ;
+} ;
+
+/* namespace */
+      }
+    }
+  }
+}
+
+#endif /*Ppr_CharsetEncoderRegistry_hpp_*/

Added: incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/encoding/ICharsetEncoder.hpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/encoding/ICharsetEncoder.hpp?rev=406628&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/encoding/ICharsetEncoder.hpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/io/encoding/ICharsetEncoder.hpp Mon May 15 06:38:57 2006
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef Ppr_ICharsetEncoder_hpp_
+#define Ppr_ICharsetEncoder_hpp_
+
+#include <string>
+#include "ppr/util/ifr/array"
+#include "ppr/util/ifr/p"
+
+namespace apache
+{
+  namespace ppr
+  {
+    namespace io
+    {
+      namespace encoding
+      {
+        using namespace ifr ;
+        using namespace std ;
+
+/*
+ * The ICharsetEncoder interface should be implemented by any class
+ * intended to be a character set encoder/decoder.
+ */
+struct ICharsetEncoder : Interface
+{
+    virtual int length(p<string> str) = 0 ;
+    virtual p<string> encode(p<string> str, int *enclen) = 0 ;
+    virtual p<string> decode(p<string> str) = 0 ;
+} ;
+
+/* namespace */
+      }
+    }
+  }
+}
+
+#endif /*Ppr_ICharsetEncoder_hpp_*/

Modified: incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/util/Hex.cpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/util/Hex.cpp?rev=406628&r1=406627&r2=406628&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/util/Hex.cpp (original)
+++ incubator/activemq/trunk/openwire-cpp/src/main/cpp/ppr/util/Hex.cpp Mon May 15 06:38:57 2006
@@ -47,6 +47,5 @@
         sprintf(&result[i*2], "%02x", (unsigned char) buffer[i]) ;
 
     hexStr = new string(result.c_array(), result.size() - 1) ;
-
     return hexStr ;
 }

Added: incubator/activemq/trunk/openwire-cpp/src/test/cpp/IUnitTest.hpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/test/cpp/IUnitTest.hpp?rev=406628&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/test/cpp/IUnitTest.hpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/test/cpp/IUnitTest.hpp Mon May 15 06:38:57 2006
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef IUnitTest_hpp_
+#define IUnitTest_hpp_
+
+// Turn off warning message for ignored exception specification
+#ifdef _MSC_VER
+#pragma warning( disable : 4290 )
+#endif
+
+#include <exception>
+#include "ppr/util/ifr/p"
+
+using namespace ifr;
+using namespace std;
+
+struct IUnitTest : Interface
+{
+    virtual void setUp() throw (exception)  = 0 ;
+    virtual void execute() throw (exception)  = 0 ;
+    virtual void tearDown() throw (exception)  = 0 ;
+    virtual p<string> toString() = 0 ;
+};
+
+#endif /*IUnitTest_hpp_*/

Added: incubator/activemq/trunk/openwire-cpp/src/test/cpp/TestAsynchQueue.cpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/test/cpp/TestAsynchQueue.cpp?rev=406628&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/test/cpp/TestAsynchQueue.cpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/test/cpp/TestAsynchQueue.cpp Mon May 15 06:38:57 2006
@@ -0,0 +1,121 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "TestAsynchQueue.hpp"
+
+/*
+ * 
+ */
+TestAsynchQueue::TestAsynchQueue(p<IConnection> connection)
+{
+    this->connection = connection ;
+    this->error      = NULL ;
+}
+
+/*
+ * 
+ */
+TestAsynchQueue::~TestAsynchQueue()
+{
+    // no-op
+}
+
+/*
+ * 
+ */
+void TestAsynchQueue::setUp() throw (exception)
+{
+    // Create a session
+    session = connection->createSession() ;
+}
+
+/*
+ * 
+ */
+void TestAsynchQueue::execute() throw (exception)
+{
+    p<IQueue>             queue ;
+    p<IMessageConsumer>   consumer ;
+    p<IMessageProducer>   producer ;
+    p<IBytesMessage>      message ;
+
+    // Connect to a queue
+    queue = session->getQueue("FOO.BAR") ;
+
+    // Create producer and a asycnhrounous consumer
+    producer = session->createProducer(queue) ;
+    consumer = session->createConsumer(queue) ;
+    consumer->setMessageListener( smartify(this) ) ;
+
+    // Create binary message
+    message = session->createBytesMessage() ;
+    message->writeBoolean(true) ;
+    message->writeInt(3677490) ;
+    message->writeString("Hello Binary World!") ;
+
+    // Send message
+    producer->send(message) ;
+
+    // Wait for asynchronous message for 5s
+    semaphore.wait(5) ;
+
+    // Check if any error was registered by the message handler
+    if( error != NULL )
+        throw TraceException( error ) ;
+}
+
+/*
+ * 
+ */
+void TestAsynchQueue::tearDown() throw (exception)
+{
+    // Clean up
+    session->close() ;
+    session = NULL ;
+}
+
+p<string> TestAsynchQueue::toString()
+{
+    p<string> str = new string("Send/receive a byte message to 1 queue listener asynchronously") ;
+    return str ;
+}
+
+void TestAsynchQueue::onMessage(p<IMessage> message)
+{
+    if( message == NULL )
+    {
+        error = "Received a null message" ;
+        semaphore.notify() ;
+        return ;
+    }
+
+    p<IBytesMessage> msg = p_dyncast<IBytesMessage> (message) ;
+    if( msg == NULL )
+    {
+        error = "Received wrong type of message" ;
+        semaphore.notify() ;
+        return ;
+    }
+
+    // Verify message content
+    if( msg->readBoolean() != true ||
+        msg->readInt()     != 3677490 ||
+        msg->readString()->compare("Hello Binary World!") != 0 ) 
+    {
+        error = "Message content has been corrupted" ;
+    }
+    semaphore.notify() ;
+}

Added: incubator/activemq/trunk/openwire-cpp/src/test/cpp/TestAsynchQueue.hpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/test/cpp/TestAsynchQueue.hpp?rev=406628&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/test/cpp/TestAsynchQueue.hpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/test/cpp/TestAsynchQueue.hpp Mon May 15 06:38:57 2006
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef TestAsynchQueue_hpp_
+#define TestAsynchQueue_hpp_
+
+#include <exception>
+#include <string>
+
+#include "cms/IConnection.hpp"
+#include "ppr/thread/Semaphore.hpp"
+#include "ppr/TraceException.hpp"
+#include "ppr/util/ifr/p"
+
+#include "IUnitTest.hpp"
+
+using namespace apache::cms;
+using namespace apache::ppr;
+using namespace apache::ppr::thread;
+using namespace ifr;
+using namespace std;
+
+/*
+ * Tests sending/receiving a message to an asynchronous listener.
+ */
+class TestAsynchQueue : public IUnitTest, public IMessageListener
+{
+private:
+    p<IConnection> connection ;
+    p<ISession>    session ;
+    Semaphore      semaphore ; 
+    char*          error ;
+
+public:
+    TestAsynchQueue(p<IConnection> connection) ;
+    virtual ~TestAsynchQueue() ;
+
+    virtual void setUp() throw (exception) ;
+    virtual void execute() throw (exception) ;
+    virtual void tearDown() throw (exception) ;
+    virtual p<string> toString() ;
+
+    virtual void onMessage(p<IMessage> message) ;
+} ;
+
+#endif /*TestAsynchQueue_hpp_*/

Added: incubator/activemq/trunk/openwire-cpp/src/test/cpp/TestAsynchTopic.cpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/test/cpp/TestAsynchTopic.cpp?rev=406628&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/test/cpp/TestAsynchTopic.cpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/test/cpp/TestAsynchTopic.cpp Mon May 15 06:38:57 2006
@@ -0,0 +1,125 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "TestAsynchTopic.hpp"
+
+/*
+ * 
+ */
+TestAsynchTopic::TestAsynchTopic(p<IConnection> connection)
+{
+    this->connection = connection ;
+    this->error      = NULL ;
+}
+
+/*
+ * 
+ */
+TestAsynchTopic::~TestAsynchTopic()
+{
+    // no-op
+}
+
+/*
+ * 
+ */
+void TestAsynchTopic::setUp() throw (exception)
+{
+    // Create a session
+    session = connection->createSession() ;
+}
+
+/*
+ * 
+ */
+void TestAsynchTopic::execute() throw (exception)
+{
+    p<ITopic>           topic ;
+    p<IMessageConsumer> consumer1,
+                        consumer2 ;
+    p<IMessageProducer> producer ;
+    p<IMapMessage>      message ;
+
+    // Connect to a topic
+    topic = session->getTopic("TEST.TOPIC") ;
+
+    // Create producer and two asynchrounous consumers
+    producer = session->createProducer(topic) ;
+    consumer1 = session->createConsumer(topic) ;
+    consumer1->setMessageListener( smartify(this) ) ;
+    consumer2 = session->createConsumer(topic) ;
+    consumer2->setMessageListener( smartify(this) ) ;
+
+    // Create binary message
+    message = session->createMapMessage() ;
+    message->setBoolean("key1", false) ;
+    message->setInt("key2", 8494845) ;
+    message->setString("key3", "Hello Map World!") ;
+
+    // Send message
+    producer->send(message) ;
+
+    // Wait for asynchronous messages for 5s each
+    semaphore.wait(5) ;
+    semaphore.wait(5) ;
+
+    // Check if any error was registered by the message handlers
+    if( error != NULL )
+        throw TraceException( error ) ;
+}
+
+/*
+ * 
+ */
+void TestAsynchTopic::tearDown() throw (exception)
+{
+    // Clean up
+    session->close() ;
+    session = NULL ;
+}
+
+p<string> TestAsynchTopic::toString()
+{
+    p<string> str = new string("Send/receive a map message to 2 topic listener asynchronously") ;
+    return str ;
+}
+
+void TestAsynchTopic::onMessage(p<IMessage> message)
+{
+    if( message == NULL )
+    {
+        error = "Received a null message" ;
+        semaphore.notify() ;
+        return ;
+    }
+
+    p<IMapMessage> msg = p_dyncast<IMapMessage> (message) ;
+    if( msg == NULL )
+    {
+        error = "Received wrong type of message" ;
+        semaphore.notify() ;
+        return ;
+    }
+
+    // Verify message content
+    if( msg->getBoolean("key1") != false ||
+        msg->getInt("key2")     != 8494845 ||
+        msg->getString("key3")->compare("Hello Map World!") != 0 ) 
+    {
+        error = "Message content has been corrupted" ;
+    }
+    semaphore.notify() ;
+}

Added: incubator/activemq/trunk/openwire-cpp/src/test/cpp/TestAsynchTopic.hpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/test/cpp/TestAsynchTopic.hpp?rev=406628&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/test/cpp/TestAsynchTopic.hpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/test/cpp/TestAsynchTopic.hpp Mon May 15 06:38:57 2006
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef TestAsynchTopic_hpp_
+#define TestAsynchTopic_hpp_
+
+#include <exception>
+#include <string>
+
+#include "cms/IConnection.hpp"
+#include "ppr/thread/Semaphore.hpp"
+#include "ppr/util/ifr/p"
+
+#include "ppr/TraceException.hpp"
+#include "IUnitTest.hpp"
+
+using namespace apache::cms;
+using namespace apache::ppr;
+using namespace apache::ppr::thread;
+using namespace ifr;
+using namespace std;
+
+/*
+ * Tests sending/receiving a message to two asynchronous listeners.
+ */
+class TestAsynchTopic : public IUnitTest, public IMessageListener
+{
+private:
+    p<IConnection> connection ;
+    p<ISession>    session ;
+    Semaphore      semaphore ; 
+    char*          error ;
+
+public:
+    TestAsynchTopic(p<IConnection> connection) ;
+    virtual ~TestAsynchTopic() ;
+
+    virtual void setUp() throw (exception) ;
+    virtual void execute() throw (exception) ;
+    virtual void tearDown() throw (exception) ;
+    virtual p<string> toString() ;
+
+    virtual void onMessage(p<IMessage> message) ;
+} ;
+
+#endif /*TestAsynchTopic_hpp_*/

Added: incubator/activemq/trunk/openwire-cpp/src/test/cpp/TestLocalTXCommit.cpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/test/cpp/TestLocalTXCommit.cpp?rev=406628&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/test/cpp/TestLocalTXCommit.cpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/test/cpp/TestLocalTXCommit.cpp Mon May 15 06:38:57 2006
@@ -0,0 +1,153 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "TestLocalTXCommit.hpp"
+
+/*
+ * 
+ */
+TestLocalTXCommit::TestLocalTXCommit(p<IConnection> connection)
+{
+    this->connection  = connection ;
+    this->error       = NULL ;
+    this->transmitted = false ;
+    this->matchCount  = 0 ;
+}
+
+/*
+ * 
+ */
+TestLocalTXCommit::~TestLocalTXCommit()
+{
+    // no-op
+}
+
+/*
+ * 
+ */
+void TestLocalTXCommit::setUp() throw (exception)
+{
+    // Create a session
+    session = connection->createSession( TransactionalAckMode ) ;
+}
+
+/*
+ * 
+ */
+void TestLocalTXCommit::execute() throw (exception)
+{
+    p<IQueue>             queue ;
+    p<IMessageConsumer>   consumer ;
+    p<IMessageProducer>   producer ;
+    p<ITextMessage>       message1, message2;
+
+    // Connect to a queue
+    queue = session->getQueue("FOO.BAR") ;
+
+    // Create producer and a asycnhrounous consumer
+    producer = session->createProducer(queue) ;
+    consumer = session->createConsumer(queue) ;
+    consumer->setMessageListener( smartify(this) ) ;
+
+    // Create text messages
+    message1 = session->createTextMessage("LocalTX 1") ;
+    message2 = session->createTextMessage("LocalTX 2") ;
+
+    // Send messages
+    producer->send(message1) ;
+    producer->send(message2) ;
+
+    // Commit transaction
+    transmitted = true ;
+    session->commit() ;
+
+    // Wait for asynchronous receive for 5s
+    semaphore.wait(5) ;
+
+    // Check if any error was registered by the message handler
+    if( error != NULL )
+        throw TraceException( error ) ;
+}
+
+/*
+ * 
+ */
+void TestLocalTXCommit::tearDown() throw (exception)
+{
+    // Clean up
+    session->close() ;
+    session = NULL ;
+}
+
+p<string> TestLocalTXCommit::toString()
+{
+    p<string> str = new string("Sends multiple messages to a queue guarded by a local transaction") ;
+    return str ;
+}
+
+void TestLocalTXCommit::onMessage(p<IMessage> message)
+{
+    if( !transmitted )
+    {
+        error = "Received a message before transaction was committed" ;
+        session->rollback() ;
+        semaphore.notify() ;
+        return ;
+    }
+
+    if( message == NULL )
+    {
+        error = "Received a null message" ;
+        session->rollback() ;
+        semaphore.notify() ;
+        return ;
+    }
+
+    p<ITextMessage> msg = p_dyncast<ITextMessage> (message) ;
+    if( msg == NULL )
+    {
+        error = "Received wrong type of message" ;
+        semaphore.notify() ;
+        return ;
+    }
+
+    // Verify message content
+    if( msg->getText()->compare("LocalTX 1") != 0 ||
+        msg->getText()->compare("LocalTX 2") != 0 ) 
+    {
+        matchCount++ ;
+    }
+    else
+    {
+        error = "Message content has been corrupted" ;
+
+        // Rollback receive
+        session->rollback() ;
+
+        // Wakeup main thread
+        semaphore.notify() ;
+    }
+
+    // Did we receive both messages?
+    if( matchCount == 2 )
+    {
+        // Commit receive
+        session->commit() ;
+
+        // Wakeup main thread
+        semaphore.notify() ;
+    }
+}

Added: incubator/activemq/trunk/openwire-cpp/src/test/cpp/TestLocalTXCommit.hpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-cpp/src/test/cpp/TestLocalTXCommit.hpp?rev=406628&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-cpp/src/test/cpp/TestLocalTXCommit.hpp (added)
+++ incubator/activemq/trunk/openwire-cpp/src/test/cpp/TestLocalTXCommit.hpp Mon May 15 06:38:57 2006
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef TestLocalTXCommit_hpp_
+#define TestLocalTXCommit_hpp_
+
+#include <exception>
+#include <string>
+
+#include "cms/IConnection.hpp"
+#include "activemq/AcknowledgementMode.hpp"
+#include "ppr/thread/Semaphore.hpp"
+#include "ppr/TraceException.hpp"
+#include "ppr/util/ifr/p"
+
+#include "IUnitTest.hpp"
+
+using namespace apache;
+using namespace apache::cms;
+using namespace apache::ppr;
+using namespace apache::ppr::thread;
+using namespace ifr;
+using namespace std;
+
+/*
+ * Tests sending multiple messages to a queue guarded by a local transaction.
+ */
+class TestLocalTXCommit : public IUnitTest, public IMessageListener
+{
+private:
+    p<IConnection> connection ;
+    p<ISession>    session ;
+    Semaphore      semaphore ; 
+    char*          error ;
+    bool           transmitted ;
+    int            matchCount ;
+
+public:
+    TestLocalTXCommit(p<IConnection> connection) ;
+    virtual ~TestLocalTXCommit() ;
+
+    virtual void setUp() throw (exception) ;
+    virtual void execute() throw (exception) ;
+    virtual void tearDown() throw (exception) ;
+    virtual p<string> toString() ;
+
+    virtual void onMessage(p<IMessage> message) ;
+} ;
+
+#endif /*TestLocalTXCommit_hpp_*/