You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/10/09 22:18:26 UTC

svn commit: r454489 [1/2] - in /incubator/activemq/activemq-dotnet/trunk: ./ src/main/csharp/ActiveMQ/ src/main/csharp/ActiveMQ/Commands/ src/main/csharp/ActiveMQ/OpenWire/ src/main/csharp/ActiveMQ/OpenWire/V1/ src/main/csharp/ActiveMQ/OpenWire/V2/ src...

Author: chirino
Date: Mon Oct  9 13:18:25 2006
New Revision: 454489

URL: http://svn.apache.org/viewvc?view=rev&rev=454489
Log:
Added openwire negociation support so that we can switch to version 2
if the remote broker supports version 2 of the openwire protocol.



Added:
    incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/OpenWire/IMarshallerFactory.cs
    incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Util/AtomicBoolean.cs
    incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Util/CountDownLatch.cs
    incubator/activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/CommonAssemblyInfo.cs
Modified:
    incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Commands/WireFormatInfo.cs
    incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Connection.cs
    incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/MessageConsumer.cs
    incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/OpenWire/OpenWireFormat.cs
    incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/OpenWire/V1/MarshallerFactory.cs
    incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/OpenWire/V2/MarshallerFactory.cs
    incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransport.cs
    incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransportFactory.cs
    incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/WireFormatNegotiator.cs
    incubator/activemq/activemq-dotnet/trunk/src/test/csharp/NMS/Test/NMSTestSupport.cs
    incubator/activemq/activemq-dotnet/trunk/vs2005-activemq.csproj
    incubator/activemq/activemq-dotnet/trunk/vs2005-msmq.csproj
    incubator/activemq/activemq-dotnet/trunk/vs2005-nms.csproj

Modified: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Commands/WireFormatInfo.cs
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Commands/WireFormatInfo.cs?view=diff&rev=454489&r1=454488&r2=454489
==============================================================================
--- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Commands/WireFormatInfo.cs (original)
+++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Commands/WireFormatInfo.cs Mon Oct  9 13:18:25 2006
@@ -15,6 +15,7 @@
 * limitations under the License.
 */
 
+using System;
 using ActiveMQ.OpenWire;
 using NMS;
 
@@ -65,6 +66,23 @@
             set { this.magic = value; }
         }
 
+        public bool Valid
+        {
+            get
+            {
+                if ( magic == null )
+                    return false;
+                if (magic.Length != MAGIC.Length)
+                    return false;
+                for (int i = 0; i < magic.Length; i++ )
+                {
+                    if( magic[i]!=MAGIC[i] )
+                        return false;
+                }
+                return true;
+            }
+        }
+        
         public int Version
         {
             get { return version; }

Modified: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Connection.cs
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Connection.cs?view=diff&rev=454489&r1=454488&r2=454489
==============================================================================
--- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Connection.cs (original)
+++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Connection.cs Mon Oct  9 13:18:25 2006
@@ -39,8 +39,9 @@
         private long sessionCounter;
         private long temporaryDestinationCounter;
         private long localTransactionCounter;
-        
-        
+        private bool closing;
+
+
         public Connection(ITransport transport, ConnectionInfo info)
         {
             this.transport = transport;
@@ -93,6 +94,7 @@
                 session.Dispose();
             }
             */
+            closing = true;
             DisposeOf(ConnectionId);
             sessions.Clear();
 			transport.Oneway(new ShutdownInfo());
@@ -269,6 +271,14 @@
             else if (command is BrokerInfo)
             {
                 this.brokerInfo = (BrokerInfo) command;
+            }
+            else if (command is ShutdownInfo)
+            {
+                ShutdownInfo info = (ShutdownInfo)command;
+                if( !closing && !closed )
+                {
+                    OnException(transport, new NMSException("Broker closed this connection."));
+                }
             }
             else
             {

Modified: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/MessageConsumer.cs?view=diff&rev=454489&r1=454488&r2=454489
==============================================================================
--- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/MessageConsumer.cs (original)
+++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/MessageConsumer.cs Mon Oct  9 13:18:25 2006
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+using System;
 using ActiveMQ.Commands;
 using NMS;
 using System.Threading;
@@ -144,7 +145,13 @@
                 {
                    //here we add the code that if do acknowledge action.
                    message = AutoAcknowledge(message);
-                   listener(message);
+                   try
+                   {
+                       listener(message);
+                   } catch(Exception e)
+                   {
+                       // TODO: what do do if the listener errors out?
+                   }
                 }
 
                 // lets now break to give the acknowledgement a chance to be processed

Added: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/OpenWire/IMarshallerFactory.cs
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/OpenWire/IMarshallerFactory.cs?view=auto&rev=454489
==============================================================================
--- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/OpenWire/IMarshallerFactory.cs (added)
+++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/OpenWire/IMarshallerFactory.cs Mon Oct  9 13:18:25 2006
@@ -0,0 +1,12 @@
+using System;
+using System.Collections.Generic;
+using System.Text;
+using ActiveMQ.OpenWire;
+
+namespace ActiveMQ.OpenWire
+{
+    interface IMarshallerFactory
+    {
+        void configure(OpenWireFormat format);
+    }
+}

Modified: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/OpenWire/OpenWireFormat.cs
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/OpenWire/OpenWireFormat.cs?view=diff&rev=454489&r1=454488&r2=454489
==============================================================================
--- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/OpenWire/OpenWireFormat.cs (original)
+++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/OpenWire/OpenWireFormat.cs Mon Oct  9 13:18:25 2006
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+using System.Reflection;
 using ActiveMQ.Commands;
 using ActiveMQ.OpenWire.V1;
 using System;
@@ -30,18 +31,25 @@
         private BaseDataStreamMarshaller[] dataMarshallers;
         private const byte NULL_TYPE = 0;
 		
-		private int version=1;
+		private int version;
 		private bool stackTraceEnabled=false;
 		private bool tightEncodingEnabled=false;
 		private bool sizePrefixDisabled=false;
-		
+        private int minimumVersion=1;
+
+        private WireFormatInfo preferedWireFormatInfo = new WireFormatInfo();
+        
         public OpenWireFormat()
         {
+            PreferedWireFormatInfo.StackTraceEnabled = false;
+            PreferedWireFormatInfo.TightEncodingEnabled = false;
+            PreferedWireFormatInfo.TcpNoDelayEnabled = false;
+            PreferedWireFormatInfo.CacheEnabled = false;
+            PreferedWireFormatInfo.SizePrefixDisabled = false;
+            PreferedWireFormatInfo.Version = 2;
+            
             dataMarshallers = new BaseDataStreamMarshaller[256];
-            // TODO: We need to dynamically load the marshaller factory based
-            // on the openwire version.
-            MarshallerFactory factory = new MarshallerFactory();
-            factory.configure(this);
+            Version = 1;
         }
                 
         public bool StackTraceEnabled {
@@ -50,7 +58,14 @@
         }
         public int Version {
             get { return version; }
-			set { version = value; }
+			set {
+
+                Assembly dll = Assembly.GetExecutingAssembly();
+                Type type = dll.GetType("ActiveMQ.OpenWire.V"+value+".MarshallerFactory", false);
+                IMarshallerFactory factory = (IMarshallerFactory) Activator.CreateInstance(type);			    
+                factory.configure(this);			    
+			    version = value; 			
+			}
         }
         public bool SizePrefixDisabled {
             get { return sizePrefixDisabled; }
@@ -60,6 +75,20 @@
             get { return tightEncodingEnabled; }
 			set { tightEncodingEnabled = value; }
         }
+
+        public WireFormatInfo PreferedWireFormatInfo
+        {
+            get { return preferedWireFormatInfo; }
+            set { preferedWireFormatInfo = value; }
+        }
+
+        public void clearMarshallers()
+        {
+            for (int i=0; i < dataMarshallers.Length; i++ )
+            {
+                dataMarshallers[i] = null;
+            }
+        }
         
         public void addMarshaller(BaseDataStreamMarshaller marshaller)
         {
@@ -276,6 +305,21 @@
                 return null;
             }
         }
-        
+
+        public void renegotiateWireFormat(WireFormatInfo info)
+        {
+            if (info.Version < minimumVersion)
+            {
+                throw new IOException("Remote wire format (" + info.Version +") is lower the minimum version required (" + minimumVersion + ")");
+            }
+
+            this.Version = Math.Min( PreferedWireFormatInfo.Version, info.Version);
+            this.stackTraceEnabled = info.StackTraceEnabled && PreferedWireFormatInfo.StackTraceEnabled;
+//            this.tcpNoDelayEnabled = info.TcpNoDelayEnabled && PreferedWireFormatInfo.TcpNoDelayEnabled;
+//            this.cacheEnabled = info.CacheEnabled && PreferedWireFormatInfo.CacheEnabled;
+            this.tightEncodingEnabled = info.TightEncodingEnabled && PreferedWireFormatInfo.TightEncodingEnabled;
+            this.sizePrefixDisabled = info.SizePrefixDisabled && PreferedWireFormatInfo.SizePrefixDisabled;
+            
+        }
     }
 }

Modified: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/OpenWire/V1/MarshallerFactory.cs
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/OpenWire/V1/MarshallerFactory.cs?view=diff&rev=454489&r1=454488&r2=454489
==============================================================================
--- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/OpenWire/V1/MarshallerFactory.cs (original)
+++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/OpenWire/V1/MarshallerFactory.cs Mon Oct  9 13:18:25 2006
@@ -34,11 +34,11 @@
 	/// <summary>
 	/// Used to create marshallers for a specific version of the wire protocol
 	/// </summary>
-    public class MarshallerFactory
+    public class MarshallerFactory : IMarshallerFactory
     {
         public void configure(OpenWireFormat format) 
         {
-
+            format.clearMarshallers();
             format.addMarshaller(new LocalTransactionIdMarshaller());
             format.addMarshaller(new PartialCommandMarshaller());
             format.addMarshaller(new IntegerResponseMarshaller());

Modified: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/OpenWire/V2/MarshallerFactory.cs
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/OpenWire/V2/MarshallerFactory.cs?view=diff&rev=454489&r1=454488&r2=454489
==============================================================================
--- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/OpenWire/V2/MarshallerFactory.cs (original)
+++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/OpenWire/V2/MarshallerFactory.cs Mon Oct  9 13:18:25 2006
@@ -1,98 +1,99 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-//
-// NOTE!: This file is autogenerated - do not modify!
-//        if you need to make a change, please see the Groovy scripts in the
-//        activemq-core module
-//
-
-using System;
-using System.Collections;
-using System.IO;
-
-using ActiveMQ.Commands;
-using ActiveMQ.OpenWire;
-using ActiveMQ.OpenWire.V2;
-
-namespace ActiveMQ.OpenWire.V2
-{
-	/// <summary>
-	/// Used to create marshallers for a specific version of the wire protocol
-	/// </summary>
-    public class MarshallerFactory
-    {
-        public void configure(OpenWireFormat format) 
-        {
-            format.addMarshaller(new ActiveMQBytesMessageMarshaller());
-            format.addMarshaller(new ActiveMQMapMessageMarshaller());
-            format.addMarshaller(new ActiveMQMessageMarshaller());
-            format.addMarshaller(new ActiveMQObjectMessageMarshaller());
-            format.addMarshaller(new ActiveMQQueueMarshaller());
-            format.addMarshaller(new ActiveMQStreamMessageMarshaller());
-            format.addMarshaller(new ActiveMQTempQueueMarshaller());
-            format.addMarshaller(new ActiveMQTempTopicMarshaller());
-            format.addMarshaller(new ActiveMQTextMessageMarshaller());
-            format.addMarshaller(new ActiveMQTopicMarshaller());
-            format.addMarshaller(new BrokerIdMarshaller());
-            format.addMarshaller(new BrokerInfoMarshaller());
-            format.addMarshaller(new ConnectionControlMarshaller());
-            format.addMarshaller(new ConnectionErrorMarshaller());
-            format.addMarshaller(new ConnectionIdMarshaller());
-            format.addMarshaller(new ConnectionInfoMarshaller());
-            format.addMarshaller(new ConsumerControlMarshaller());
-            format.addMarshaller(new ConsumerIdMarshaller());
-            format.addMarshaller(new ConsumerInfoMarshaller());
-            format.addMarshaller(new ControlCommandMarshaller());
-            format.addMarshaller(new DataArrayResponseMarshaller());
-            format.addMarshaller(new DataResponseMarshaller());
-            format.addMarshaller(new DestinationInfoMarshaller());
-            format.addMarshaller(new DiscoveryEventMarshaller());
-            format.addMarshaller(new ExceptionResponseMarshaller());
-            format.addMarshaller(new FlushCommandMarshaller());
-            format.addMarshaller(new IntegerResponseMarshaller());
-            format.addMarshaller(new JournalQueueAckMarshaller());
-            format.addMarshaller(new JournalTopicAckMarshaller());
-            format.addMarshaller(new JournalTraceMarshaller());
-            format.addMarshaller(new JournalTransactionMarshaller());
-            format.addMarshaller(new KeepAliveInfoMarshaller());
-            format.addMarshaller(new LastPartialCommandMarshaller());
-            format.addMarshaller(new LocalTransactionIdMarshaller());
-            format.addMarshaller(new MessageAckMarshaller());
-            format.addMarshaller(new MessageDispatchMarshaller());
-            format.addMarshaller(new MessageDispatchNotificationMarshaller());
-            format.addMarshaller(new MessageIdMarshaller());
-            format.addMarshaller(new MessagePullMarshaller());
-            format.addMarshaller(new NetworkBridgeFilterMarshaller());
-            format.addMarshaller(new PartialCommandMarshaller());
-            format.addMarshaller(new ProducerIdMarshaller());
-            format.addMarshaller(new ProducerInfoMarshaller());
-            format.addMarshaller(new RemoveInfoMarshaller());
-            format.addMarshaller(new RemoveSubscriptionInfoMarshaller());
-            format.addMarshaller(new ReplayCommandMarshaller());
-            format.addMarshaller(new ResponseMarshaller());
-            format.addMarshaller(new SessionIdMarshaller());
-            format.addMarshaller(new SessionInfoMarshaller());
-            format.addMarshaller(new ShutdownInfoMarshaller());
-            format.addMarshaller(new SubscriptionInfoMarshaller());
-            format.addMarshaller(new TransactionInfoMarshaller());
-            format.addMarshaller(new WireFormatInfoMarshaller());
-            format.addMarshaller(new XATransactionIdMarshaller());
-
-    	}
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+//
+// NOTE!: This file is autogenerated - do not modify!
+//        if you need to make a change, please see the Groovy scripts in the
+//        activemq-core module
+//
+
+using System;
+using System.Collections;
+using System.IO;
+
+using ActiveMQ.Commands;
+using ActiveMQ.OpenWire;
+using ActiveMQ.OpenWire.V2;
+
+namespace ActiveMQ.OpenWire.V2
+{
+	/// <summary>
+	/// Used to create marshallers for a specific version of the wire protocol
+	/// </summary>
+    public class MarshallerFactory : IMarshallerFactory
+    {
+        public void configure(OpenWireFormat format) 
+        {
+            format.clearMarshallers();
+            format.addMarshaller(new ActiveMQBytesMessageMarshaller());
+            format.addMarshaller(new ActiveMQMapMessageMarshaller());
+            format.addMarshaller(new ActiveMQMessageMarshaller());
+            format.addMarshaller(new ActiveMQObjectMessageMarshaller());
+            format.addMarshaller(new ActiveMQQueueMarshaller());
+            format.addMarshaller(new ActiveMQStreamMessageMarshaller());
+            format.addMarshaller(new ActiveMQTempQueueMarshaller());
+            format.addMarshaller(new ActiveMQTempTopicMarshaller());
+            format.addMarshaller(new ActiveMQTextMessageMarshaller());
+            format.addMarshaller(new ActiveMQTopicMarshaller());
+            format.addMarshaller(new BrokerIdMarshaller());
+            format.addMarshaller(new BrokerInfoMarshaller());
+            format.addMarshaller(new ConnectionControlMarshaller());
+            format.addMarshaller(new ConnectionErrorMarshaller());
+            format.addMarshaller(new ConnectionIdMarshaller());
+            format.addMarshaller(new ConnectionInfoMarshaller());
+            format.addMarshaller(new ConsumerControlMarshaller());
+            format.addMarshaller(new ConsumerIdMarshaller());
+            format.addMarshaller(new ConsumerInfoMarshaller());
+            format.addMarshaller(new ControlCommandMarshaller());
+            format.addMarshaller(new DataArrayResponseMarshaller());
+            format.addMarshaller(new DataResponseMarshaller());
+            format.addMarshaller(new DestinationInfoMarshaller());
+            format.addMarshaller(new DiscoveryEventMarshaller());
+            format.addMarshaller(new ExceptionResponseMarshaller());
+            format.addMarshaller(new FlushCommandMarshaller());
+            format.addMarshaller(new IntegerResponseMarshaller());
+            format.addMarshaller(new JournalQueueAckMarshaller());
+            format.addMarshaller(new JournalTopicAckMarshaller());
+            format.addMarshaller(new JournalTraceMarshaller());
+            format.addMarshaller(new JournalTransactionMarshaller());
+            format.addMarshaller(new KeepAliveInfoMarshaller());
+            format.addMarshaller(new LastPartialCommandMarshaller());
+            format.addMarshaller(new LocalTransactionIdMarshaller());
+            format.addMarshaller(new MessageAckMarshaller());
+            format.addMarshaller(new MessageDispatchMarshaller());
+            format.addMarshaller(new MessageDispatchNotificationMarshaller());
+            format.addMarshaller(new MessageIdMarshaller());
+            format.addMarshaller(new MessagePullMarshaller());
+            format.addMarshaller(new NetworkBridgeFilterMarshaller());
+            format.addMarshaller(new PartialCommandMarshaller());
+            format.addMarshaller(new ProducerIdMarshaller());
+            format.addMarshaller(new ProducerInfoMarshaller());
+            format.addMarshaller(new RemoveInfoMarshaller());
+            format.addMarshaller(new RemoveSubscriptionInfoMarshaller());
+            format.addMarshaller(new ReplayCommandMarshaller());
+            format.addMarshaller(new ResponseMarshaller());
+            format.addMarshaller(new SessionIdMarshaller());
+            format.addMarshaller(new SessionInfoMarshaller());
+            format.addMarshaller(new ShutdownInfoMarshaller());
+            format.addMarshaller(new SubscriptionInfoMarshaller());
+            format.addMarshaller(new TransactionInfoMarshaller());
+            format.addMarshaller(new WireFormatInfoMarshaller());
+            format.addMarshaller(new XATransactionIdMarshaller());
+
+    	}
+    }
+}

Modified: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransport.cs
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransport.cs?view=diff&rev=454489&r1=454488&r2=454489
==============================================================================
--- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransport.cs (original)
+++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransport.cs Mon Oct  9 13:18:25 2006
@@ -75,7 +75,7 @@
         
         public void Oneway(Command command)
         {
-            wireformat.Marshal(command, socketWriter);
+            Wireformat.Marshal(command, socketWriter);
             socketWriter.Flush();
         }
         
@@ -104,7 +104,7 @@
             {
                 try
                 {
-                    Command command = (Command) wireformat.Unmarshal(socketReader);
+                    Command command = (Command) Wireformat.Unmarshal(socketReader);
                     this.commandHandler(this, command);
                 }
                 catch (ObjectDisposedException)
@@ -135,6 +135,11 @@
             set { this.exceptionHandler = value; }
         }
 
+        public OpenWireFormat Wireformat
+        {
+            get { return wireformat; }
+            set { wireformat = value; }
+        }
     }
 }
 

Modified: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransportFactory.cs
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransportFactory.cs?view=diff&rev=454489&r1=454488&r2=454489
==============================================================================
--- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransportFactory.cs (original)
+++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransportFactory.cs Mon Oct  9 13:18:25 2006
@@ -36,15 +36,17 @@
 		{
             // Console.WriteLine("Opening socket to: " + host + " on port: " + port);
             Socket socket = Connect(location.Host, location.Port);
-            ITransport rc = new TcpTransport(socket);
+            TcpTransport tcpTransport = new TcpTransport(socket);
+            ITransport rc = tcpTransport;
 
-			// At present the URI is parsed for options by the ConnectionFactory
+            rc = new WireFormatNegotiator(rc, tcpTransport.Wireformat);
+            
+            // At present the URI is parsed for options by the ConnectionFactory
 			if (UseLogging) {
                 rc = new LoggingTransport(rc);
             }
             rc = new ResponseCorrelator(rc);
             rc = new MutexTransport(rc);
-            rc = new WireFormatNegotiator(rc);
 
             return rc;
         }

Modified: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/WireFormatNegotiator.cs
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/WireFormatNegotiator.cs?view=diff&rev=454489&r1=454488&r2=454489
==============================================================================
--- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/WireFormatNegotiator.cs (original)
+++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/WireFormatNegotiator.cs Mon Oct  9 13:18:25 2006
@@ -1,50 +1,106 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using ActiveMQ.Commands;
-using ActiveMQ.Transport;
-using System;
-
-namespace ActiveMQ.Transport
-{
-	
-    /// <summary>
-    /// A Transport which negotiates the wire format
-    /// </summary>
-    public class WireFormatNegotiator : TransportFilter
-    {
-
-        public WireFormatNegotiator(ITransport next) : base(next) {
-        }
-        
-        public override void Start() {
-            base.Start();
-
-
-            // now lets start the protocol negotiation
-            WireFormatInfo info = new WireFormatInfo();
-            info.StackTraceEnabled=false;
-            info.TightEncodingEnabled=false;
-            info.TcpNoDelayEnabled=false;
-            info.CacheEnabled=false;
-            info.SizePrefixDisabled=false;
-            info.Version = 1;
-
-            Oneway(info);
-        }
-    }
-}
-
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System.IO;
+using System.Threading;
+using ActiveMQ.Commands;
+using ActiveMQ.OpenWire;
+using ActiveMQ.Transport;
+using System;
+using ActiveMQ.Util;
+
+namespace ActiveMQ.Transport
+{
+	
+    /// <summary>
+    /// A Transport which negotiates the wire format
+    /// </summary>
+    public class WireFormatNegotiator : TransportFilter
+    {
+        private OpenWireFormat wireFormat;
+        private TimeSpan negotiateTimeout=new TimeSpan(0,0,15);
+    
+        private AtomicBoolean firstStart=new AtomicBoolean(true);
+        private CountDownLatch readyCountDownLatch = new CountDownLatch(1);
+        private CountDownLatch wireInfoSentDownLatch = new CountDownLatch(1);
+
+        public WireFormatNegotiator(ITransport next, OpenWireFormat wireFormat)
+            : base(next)
+        {
+            this.wireFormat = wireFormat;
+        }
+        
+        public override void Start() {
+            base.Start();
+            if (firstStart.compareAndSet(true, false))
+            {
+                try
+                {
+                    next.Oneway(wireFormat.PreferedWireFormatInfo);
+                }
+                finally
+                {
+                    wireInfoSentDownLatch.countDown();
+                }
+            }
+        }
+        
+        public override void Dispose() {
+        	base.Dispose();
+            readyCountDownLatch.countDown();
+        }
+
+        public override void Oneway(Command command)
+        {
+            if (!readyCountDownLatch.await(negotiateTimeout))
+                throw new IOException("Wire format negociation timeout: peer did not send his wire format.");
+            next.Oneway(command);
+        }
+
+        protected override void OnCommand(ITransport sender, Command command)
+        {
+            if ( command.GetDataStructureType() == WireFormatInfo.ID_WireFormatInfo )
+            {
+                WireFormatInfo info = (WireFormatInfo)command;
+                try
+                {
+                    if (!info.Valid)
+                    {
+                        throw new IOException("Remote wire format magic is invalid");
+                    }
+                    wireInfoSentDownLatch.await(negotiateTimeout);
+                    wireFormat.renegotiateWireFormat(info);
+                }
+                catch (Exception e)
+                {
+                    OnException(this, e);
+                } 
+                finally
+                {
+                    readyCountDownLatch.countDown();
+                }
+            }
+            this.commandHandler(sender, command);
+        }
+
+        protected override void OnException(ITransport sender, Exception command)
+        {
+            readyCountDownLatch.countDown();
+            this.exceptionHandler(sender, command);
+        }
+    }
+}
+

Added: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Util/AtomicBoolean.cs
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Util/AtomicBoolean.cs?view=auto&rev=454489
==============================================================================
--- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Util/AtomicBoolean.cs (added)
+++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Util/AtomicBoolean.cs Mon Oct  9 13:18:25 2006
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace ActiveMQ.Util
+{
+    class AtomicBoolean
+    {
+        bool value;
+
+        public AtomicBoolean(bool b)
+        {
+            value = b;
+        }
+
+        public bool compareAndSet(bool expected, bool newValue)
+        {
+            lock(this)
+            {
+                if (value == expected)
+                {
+                    value = newValue;
+                    return true;
+                }
+                return false;                
+            }
+        }
+    }
+}

Added: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Util/CountDownLatch.cs
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Util/CountDownLatch.cs?view=auto&rev=454489
==============================================================================
--- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Util/CountDownLatch.cs (added)
+++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Util/CountDownLatch.cs Mon Oct  9 13:18:25 2006
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading;
+
+namespace ActiveMQ.Util
+{
+    class CountDownLatch
+    {
+        int remaining;
+        public CountDownLatch(int i)
+        {
+            remaining=i;
+        }
+
+        public void countDown()
+        {
+            lock(this)
+            {
+                if( remaining > 0 ) {
+                    remaining--;
+                    Monitor.PulseAll(this);
+                }
+            }
+        }
+
+        public bool await(TimeSpan timeout)
+        {
+            lock (this)
+            {
+                if (remaining > 0)
+                {
+                    Monitor.Wait(this, timeout);
+                    if (remaining > 0)
+                    {
+                        return false;
+                    }
+                }
+            }
+            return true;
+        }
+    }
+}

Added: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/CommonAssemblyInfo.cs
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/CommonAssemblyInfo.cs?view=auto&rev=454489
==============================================================================
--- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/CommonAssemblyInfo.cs (added)
+++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/CommonAssemblyInfo.cs Mon Oct  9 13:18:25 2006
@@ -0,0 +1,27 @@
+using System;
+using System.Reflection;
+using System.Runtime.InteropServices;
+
+//------------------------------------------------------------------------------
+// <auto-generated>
+//     This code was generated by a tool.
+//     Runtime Version:2.0.50727.42
+//
+//     Changes to this file may cause incorrect behavior and will be lost if
+//     the code is regenerated.
+// </auto-generated>
+//------------------------------------------------------------------------------
+
+[assembly: ComVisibleAttribute(false)]
+[assembly: CLSCompliantAttribute(true)]
+[assembly: AssemblyTitleAttribute("Apache NMS for MSMQ")]
+[assembly: AssemblyDescriptionAttribute("An NMS (.Net Messaging Library) to MSMQ")]
+[assembly: AssemblyConfigurationAttribute("SNAPSHOT")]
+[assembly: AssemblyCompanyAttribute("http://incubator.apache.org/activemq/")]
+[assembly: AssemblyProductAttribute("Apache ActiveMQ")]
+[assembly: AssemblyCopyrightAttribute("Copyright (C) 2005-2006 Apache Software Foundation")]
+[assembly: AssemblyTrademarkAttribute("")]
+[assembly: AssemblyCultureAttribute("")]
+[assembly: AssemblyVersionAttribute("4.0")]
+[assembly: AssemblyInformationalVersionAttribute("4.0")]
+

Modified: incubator/activemq/activemq-dotnet/trunk/src/test/csharp/NMS/Test/NMSTestSupport.cs
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/test/csharp/NMS/Test/NMSTestSupport.cs?view=diff&rev=454489&r1=454488&r2=454489
==============================================================================
--- incubator/activemq/activemq-dotnet/trunk/src/test/csharp/NMS/Test/NMSTestSupport.cs (original)
+++ incubator/activemq/activemq-dotnet/trunk/src/test/csharp/NMS/Test/NMSTestSupport.cs Mon Oct  9 13:18:25 2006
@@ -196,7 +196,7 @@
 
         protected virtual string CreateDestinationName()
         {
-            return "Test.DotNet." + GetType().Name;
+            return "Test.DotNet." + GetType().Name + "." + DateTime.Now.Ticks;
         }
         
         protected virtual IMessage CreateMessage()