You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/10/09 22:18:26 UTC
svn commit: r454489 [1/2] - in /incubator/activemq/activemq-dotnet/trunk: ./
src/main/csharp/ActiveMQ/ src/main/csharp/ActiveMQ/Commands/
src/main/csharp/ActiveMQ/OpenWire/ src/main/csharp/ActiveMQ/OpenWire/V1/
src/main/csharp/ActiveMQ/OpenWire/V2/ src...
Author: chirino
Date: Mon Oct 9 13:18:25 2006
New Revision: 454489
URL: http://svn.apache.org/viewvc?view=rev&rev=454489
Log:
Added openwire negociation support so that we can switch to version 2
if the remote broker supports version 2 of the openwire protocol.
Added:
incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/OpenWire/IMarshallerFactory.cs
incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Util/AtomicBoolean.cs
incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Util/CountDownLatch.cs
incubator/activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/CommonAssemblyInfo.cs
Modified:
incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Commands/WireFormatInfo.cs
incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Connection.cs
incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/MessageConsumer.cs
incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/OpenWire/OpenWireFormat.cs
incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/OpenWire/V1/MarshallerFactory.cs
incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/OpenWire/V2/MarshallerFactory.cs
incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransport.cs
incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransportFactory.cs
incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/WireFormatNegotiator.cs
incubator/activemq/activemq-dotnet/trunk/src/test/csharp/NMS/Test/NMSTestSupport.cs
incubator/activemq/activemq-dotnet/trunk/vs2005-activemq.csproj
incubator/activemq/activemq-dotnet/trunk/vs2005-msmq.csproj
incubator/activemq/activemq-dotnet/trunk/vs2005-nms.csproj
Modified: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Commands/WireFormatInfo.cs
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Commands/WireFormatInfo.cs?view=diff&rev=454489&r1=454488&r2=454489
==============================================================================
--- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Commands/WireFormatInfo.cs (original)
+++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Commands/WireFormatInfo.cs Mon Oct 9 13:18:25 2006
@@ -15,6 +15,7 @@
* limitations under the License.
*/
+using System;
using ActiveMQ.OpenWire;
using NMS;
@@ -65,6 +66,23 @@
set { this.magic = value; }
}
+ public bool Valid
+ {
+ get
+ {
+ if ( magic == null )
+ return false;
+ if (magic.Length != MAGIC.Length)
+ return false;
+ for (int i = 0; i < magic.Length; i++ )
+ {
+ if( magic[i]!=MAGIC[i] )
+ return false;
+ }
+ return true;
+ }
+ }
+
public int Version
{
get { return version; }
Modified: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Connection.cs
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Connection.cs?view=diff&rev=454489&r1=454488&r2=454489
==============================================================================
--- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Connection.cs (original)
+++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Connection.cs Mon Oct 9 13:18:25 2006
@@ -39,8 +39,9 @@
private long sessionCounter;
private long temporaryDestinationCounter;
private long localTransactionCounter;
-
-
+ private bool closing;
+
+
public Connection(ITransport transport, ConnectionInfo info)
{
this.transport = transport;
@@ -93,6 +94,7 @@
session.Dispose();
}
*/
+ closing = true;
DisposeOf(ConnectionId);
sessions.Clear();
transport.Oneway(new ShutdownInfo());
@@ -269,6 +271,14 @@
else if (command is BrokerInfo)
{
this.brokerInfo = (BrokerInfo) command;
+ }
+ else if (command is ShutdownInfo)
+ {
+ ShutdownInfo info = (ShutdownInfo)command;
+ if( !closing && !closed )
+ {
+ OnException(transport, new NMSException("Broker closed this connection."));
+ }
}
else
{
Modified: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/MessageConsumer.cs?view=diff&rev=454489&r1=454488&r2=454489
==============================================================================
--- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/MessageConsumer.cs (original)
+++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/MessageConsumer.cs Mon Oct 9 13:18:25 2006
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+using System;
using ActiveMQ.Commands;
using NMS;
using System.Threading;
@@ -144,7 +145,13 @@
{
//here we add the code that if do acknowledge action.
message = AutoAcknowledge(message);
- listener(message);
+ try
+ {
+ listener(message);
+ } catch(Exception e)
+ {
+ // TODO: what do do if the listener errors out?
+ }
}
// lets now break to give the acknowledgement a chance to be processed
Added: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/OpenWire/IMarshallerFactory.cs
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/OpenWire/IMarshallerFactory.cs?view=auto&rev=454489
==============================================================================
--- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/OpenWire/IMarshallerFactory.cs (added)
+++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/OpenWire/IMarshallerFactory.cs Mon Oct 9 13:18:25 2006
@@ -0,0 +1,12 @@
+using System;
+using System.Collections.Generic;
+using System.Text;
+using ActiveMQ.OpenWire;
+
+namespace ActiveMQ.OpenWire
+{
+ interface IMarshallerFactory
+ {
+ void configure(OpenWireFormat format);
+ }
+}
Modified: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/OpenWire/OpenWireFormat.cs
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/OpenWire/OpenWireFormat.cs?view=diff&rev=454489&r1=454488&r2=454489
==============================================================================
--- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/OpenWire/OpenWireFormat.cs (original)
+++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/OpenWire/OpenWireFormat.cs Mon Oct 9 13:18:25 2006
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+using System.Reflection;
using ActiveMQ.Commands;
using ActiveMQ.OpenWire.V1;
using System;
@@ -30,18 +31,25 @@
private BaseDataStreamMarshaller[] dataMarshallers;
private const byte NULL_TYPE = 0;
- private int version=1;
+ private int version;
private bool stackTraceEnabled=false;
private bool tightEncodingEnabled=false;
private bool sizePrefixDisabled=false;
-
+ private int minimumVersion=1;
+
+ private WireFormatInfo preferedWireFormatInfo = new WireFormatInfo();
+
public OpenWireFormat()
{
+ PreferedWireFormatInfo.StackTraceEnabled = false;
+ PreferedWireFormatInfo.TightEncodingEnabled = false;
+ PreferedWireFormatInfo.TcpNoDelayEnabled = false;
+ PreferedWireFormatInfo.CacheEnabled = false;
+ PreferedWireFormatInfo.SizePrefixDisabled = false;
+ PreferedWireFormatInfo.Version = 2;
+
dataMarshallers = new BaseDataStreamMarshaller[256];
- // TODO: We need to dynamically load the marshaller factory based
- // on the openwire version.
- MarshallerFactory factory = new MarshallerFactory();
- factory.configure(this);
+ Version = 1;
}
public bool StackTraceEnabled {
@@ -50,7 +58,14 @@
}
public int Version {
get { return version; }
- set { version = value; }
+ set {
+
+ Assembly dll = Assembly.GetExecutingAssembly();
+ Type type = dll.GetType("ActiveMQ.OpenWire.V"+value+".MarshallerFactory", false);
+ IMarshallerFactory factory = (IMarshallerFactory) Activator.CreateInstance(type);
+ factory.configure(this);
+ version = value;
+ }
}
public bool SizePrefixDisabled {
get { return sizePrefixDisabled; }
@@ -60,6 +75,20 @@
get { return tightEncodingEnabled; }
set { tightEncodingEnabled = value; }
}
+
+ public WireFormatInfo PreferedWireFormatInfo
+ {
+ get { return preferedWireFormatInfo; }
+ set { preferedWireFormatInfo = value; }
+ }
+
+ public void clearMarshallers()
+ {
+ for (int i=0; i < dataMarshallers.Length; i++ )
+ {
+ dataMarshallers[i] = null;
+ }
+ }
public void addMarshaller(BaseDataStreamMarshaller marshaller)
{
@@ -276,6 +305,21 @@
return null;
}
}
-
+
+ public void renegotiateWireFormat(WireFormatInfo info)
+ {
+ if (info.Version < minimumVersion)
+ {
+ throw new IOException("Remote wire format (" + info.Version +") is lower the minimum version required (" + minimumVersion + ")");
+ }
+
+ this.Version = Math.Min( PreferedWireFormatInfo.Version, info.Version);
+ this.stackTraceEnabled = info.StackTraceEnabled && PreferedWireFormatInfo.StackTraceEnabled;
+// this.tcpNoDelayEnabled = info.TcpNoDelayEnabled && PreferedWireFormatInfo.TcpNoDelayEnabled;
+// this.cacheEnabled = info.CacheEnabled && PreferedWireFormatInfo.CacheEnabled;
+ this.tightEncodingEnabled = info.TightEncodingEnabled && PreferedWireFormatInfo.TightEncodingEnabled;
+ this.sizePrefixDisabled = info.SizePrefixDisabled && PreferedWireFormatInfo.SizePrefixDisabled;
+
+ }
}
}
Modified: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/OpenWire/V1/MarshallerFactory.cs
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/OpenWire/V1/MarshallerFactory.cs?view=diff&rev=454489&r1=454488&r2=454489
==============================================================================
--- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/OpenWire/V1/MarshallerFactory.cs (original)
+++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/OpenWire/V1/MarshallerFactory.cs Mon Oct 9 13:18:25 2006
@@ -34,11 +34,11 @@
/// <summary>
/// Used to create marshallers for a specific version of the wire protocol
/// </summary>
- public class MarshallerFactory
+ public class MarshallerFactory : IMarshallerFactory
{
public void configure(OpenWireFormat format)
{
-
+ format.clearMarshallers();
format.addMarshaller(new LocalTransactionIdMarshaller());
format.addMarshaller(new PartialCommandMarshaller());
format.addMarshaller(new IntegerResponseMarshaller());
Modified: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/OpenWire/V2/MarshallerFactory.cs
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/OpenWire/V2/MarshallerFactory.cs?view=diff&rev=454489&r1=454488&r2=454489
==============================================================================
--- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/OpenWire/V2/MarshallerFactory.cs (original)
+++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/OpenWire/V2/MarshallerFactory.cs Mon Oct 9 13:18:25 2006
@@ -1,98 +1,99 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-//
-// NOTE!: This file is autogenerated - do not modify!
-// if you need to make a change, please see the Groovy scripts in the
-// activemq-core module
-//
-
-using System;
-using System.Collections;
-using System.IO;
-
-using ActiveMQ.Commands;
-using ActiveMQ.OpenWire;
-using ActiveMQ.OpenWire.V2;
-
-namespace ActiveMQ.OpenWire.V2
-{
- /// <summary>
- /// Used to create marshallers for a specific version of the wire protocol
- /// </summary>
- public class MarshallerFactory
- {
- public void configure(OpenWireFormat format)
- {
- format.addMarshaller(new ActiveMQBytesMessageMarshaller());
- format.addMarshaller(new ActiveMQMapMessageMarshaller());
- format.addMarshaller(new ActiveMQMessageMarshaller());
- format.addMarshaller(new ActiveMQObjectMessageMarshaller());
- format.addMarshaller(new ActiveMQQueueMarshaller());
- format.addMarshaller(new ActiveMQStreamMessageMarshaller());
- format.addMarshaller(new ActiveMQTempQueueMarshaller());
- format.addMarshaller(new ActiveMQTempTopicMarshaller());
- format.addMarshaller(new ActiveMQTextMessageMarshaller());
- format.addMarshaller(new ActiveMQTopicMarshaller());
- format.addMarshaller(new BrokerIdMarshaller());
- format.addMarshaller(new BrokerInfoMarshaller());
- format.addMarshaller(new ConnectionControlMarshaller());
- format.addMarshaller(new ConnectionErrorMarshaller());
- format.addMarshaller(new ConnectionIdMarshaller());
- format.addMarshaller(new ConnectionInfoMarshaller());
- format.addMarshaller(new ConsumerControlMarshaller());
- format.addMarshaller(new ConsumerIdMarshaller());
- format.addMarshaller(new ConsumerInfoMarshaller());
- format.addMarshaller(new ControlCommandMarshaller());
- format.addMarshaller(new DataArrayResponseMarshaller());
- format.addMarshaller(new DataResponseMarshaller());
- format.addMarshaller(new DestinationInfoMarshaller());
- format.addMarshaller(new DiscoveryEventMarshaller());
- format.addMarshaller(new ExceptionResponseMarshaller());
- format.addMarshaller(new FlushCommandMarshaller());
- format.addMarshaller(new IntegerResponseMarshaller());
- format.addMarshaller(new JournalQueueAckMarshaller());
- format.addMarshaller(new JournalTopicAckMarshaller());
- format.addMarshaller(new JournalTraceMarshaller());
- format.addMarshaller(new JournalTransactionMarshaller());
- format.addMarshaller(new KeepAliveInfoMarshaller());
- format.addMarshaller(new LastPartialCommandMarshaller());
- format.addMarshaller(new LocalTransactionIdMarshaller());
- format.addMarshaller(new MessageAckMarshaller());
- format.addMarshaller(new MessageDispatchMarshaller());
- format.addMarshaller(new MessageDispatchNotificationMarshaller());
- format.addMarshaller(new MessageIdMarshaller());
- format.addMarshaller(new MessagePullMarshaller());
- format.addMarshaller(new NetworkBridgeFilterMarshaller());
- format.addMarshaller(new PartialCommandMarshaller());
- format.addMarshaller(new ProducerIdMarshaller());
- format.addMarshaller(new ProducerInfoMarshaller());
- format.addMarshaller(new RemoveInfoMarshaller());
- format.addMarshaller(new RemoveSubscriptionInfoMarshaller());
- format.addMarshaller(new ReplayCommandMarshaller());
- format.addMarshaller(new ResponseMarshaller());
- format.addMarshaller(new SessionIdMarshaller());
- format.addMarshaller(new SessionInfoMarshaller());
- format.addMarshaller(new ShutdownInfoMarshaller());
- format.addMarshaller(new SubscriptionInfoMarshaller());
- format.addMarshaller(new TransactionInfoMarshaller());
- format.addMarshaller(new WireFormatInfoMarshaller());
- format.addMarshaller(new XATransactionIdMarshaller());
-
- }
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+//
+// NOTE!: This file is autogenerated - do not modify!
+// if you need to make a change, please see the Groovy scripts in the
+// activemq-core module
+//
+
+using System;
+using System.Collections;
+using System.IO;
+
+using ActiveMQ.Commands;
+using ActiveMQ.OpenWire;
+using ActiveMQ.OpenWire.V2;
+
+namespace ActiveMQ.OpenWire.V2
+{
+ /// <summary>
+ /// Used to create marshallers for a specific version of the wire protocol
+ /// </summary>
+ public class MarshallerFactory : IMarshallerFactory
+ {
+ public void configure(OpenWireFormat format)
+ {
+ format.clearMarshallers();
+ format.addMarshaller(new ActiveMQBytesMessageMarshaller());
+ format.addMarshaller(new ActiveMQMapMessageMarshaller());
+ format.addMarshaller(new ActiveMQMessageMarshaller());
+ format.addMarshaller(new ActiveMQObjectMessageMarshaller());
+ format.addMarshaller(new ActiveMQQueueMarshaller());
+ format.addMarshaller(new ActiveMQStreamMessageMarshaller());
+ format.addMarshaller(new ActiveMQTempQueueMarshaller());
+ format.addMarshaller(new ActiveMQTempTopicMarshaller());
+ format.addMarshaller(new ActiveMQTextMessageMarshaller());
+ format.addMarshaller(new ActiveMQTopicMarshaller());
+ format.addMarshaller(new BrokerIdMarshaller());
+ format.addMarshaller(new BrokerInfoMarshaller());
+ format.addMarshaller(new ConnectionControlMarshaller());
+ format.addMarshaller(new ConnectionErrorMarshaller());
+ format.addMarshaller(new ConnectionIdMarshaller());
+ format.addMarshaller(new ConnectionInfoMarshaller());
+ format.addMarshaller(new ConsumerControlMarshaller());
+ format.addMarshaller(new ConsumerIdMarshaller());
+ format.addMarshaller(new ConsumerInfoMarshaller());
+ format.addMarshaller(new ControlCommandMarshaller());
+ format.addMarshaller(new DataArrayResponseMarshaller());
+ format.addMarshaller(new DataResponseMarshaller());
+ format.addMarshaller(new DestinationInfoMarshaller());
+ format.addMarshaller(new DiscoveryEventMarshaller());
+ format.addMarshaller(new ExceptionResponseMarshaller());
+ format.addMarshaller(new FlushCommandMarshaller());
+ format.addMarshaller(new IntegerResponseMarshaller());
+ format.addMarshaller(new JournalQueueAckMarshaller());
+ format.addMarshaller(new JournalTopicAckMarshaller());
+ format.addMarshaller(new JournalTraceMarshaller());
+ format.addMarshaller(new JournalTransactionMarshaller());
+ format.addMarshaller(new KeepAliveInfoMarshaller());
+ format.addMarshaller(new LastPartialCommandMarshaller());
+ format.addMarshaller(new LocalTransactionIdMarshaller());
+ format.addMarshaller(new MessageAckMarshaller());
+ format.addMarshaller(new MessageDispatchMarshaller());
+ format.addMarshaller(new MessageDispatchNotificationMarshaller());
+ format.addMarshaller(new MessageIdMarshaller());
+ format.addMarshaller(new MessagePullMarshaller());
+ format.addMarshaller(new NetworkBridgeFilterMarshaller());
+ format.addMarshaller(new PartialCommandMarshaller());
+ format.addMarshaller(new ProducerIdMarshaller());
+ format.addMarshaller(new ProducerInfoMarshaller());
+ format.addMarshaller(new RemoveInfoMarshaller());
+ format.addMarshaller(new RemoveSubscriptionInfoMarshaller());
+ format.addMarshaller(new ReplayCommandMarshaller());
+ format.addMarshaller(new ResponseMarshaller());
+ format.addMarshaller(new SessionIdMarshaller());
+ format.addMarshaller(new SessionInfoMarshaller());
+ format.addMarshaller(new ShutdownInfoMarshaller());
+ format.addMarshaller(new SubscriptionInfoMarshaller());
+ format.addMarshaller(new TransactionInfoMarshaller());
+ format.addMarshaller(new WireFormatInfoMarshaller());
+ format.addMarshaller(new XATransactionIdMarshaller());
+
+ }
+ }
+}
Modified: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransport.cs
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransport.cs?view=diff&rev=454489&r1=454488&r2=454489
==============================================================================
--- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransport.cs (original)
+++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransport.cs Mon Oct 9 13:18:25 2006
@@ -75,7 +75,7 @@
public void Oneway(Command command)
{
- wireformat.Marshal(command, socketWriter);
+ Wireformat.Marshal(command, socketWriter);
socketWriter.Flush();
}
@@ -104,7 +104,7 @@
{
try
{
- Command command = (Command) wireformat.Unmarshal(socketReader);
+ Command command = (Command) Wireformat.Unmarshal(socketReader);
this.commandHandler(this, command);
}
catch (ObjectDisposedException)
@@ -135,6 +135,11 @@
set { this.exceptionHandler = value; }
}
+ public OpenWireFormat Wireformat
+ {
+ get { return wireformat; }
+ set { wireformat = value; }
+ }
}
}
Modified: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransportFactory.cs
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransportFactory.cs?view=diff&rev=454489&r1=454488&r2=454489
==============================================================================
--- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransportFactory.cs (original)
+++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransportFactory.cs Mon Oct 9 13:18:25 2006
@@ -36,15 +36,17 @@
{
// Console.WriteLine("Opening socket to: " + host + " on port: " + port);
Socket socket = Connect(location.Host, location.Port);
- ITransport rc = new TcpTransport(socket);
+ TcpTransport tcpTransport = new TcpTransport(socket);
+ ITransport rc = tcpTransport;
- // At present the URI is parsed for options by the ConnectionFactory
+ rc = new WireFormatNegotiator(rc, tcpTransport.Wireformat);
+
+ // At present the URI is parsed for options by the ConnectionFactory
if (UseLogging) {
rc = new LoggingTransport(rc);
}
rc = new ResponseCorrelator(rc);
rc = new MutexTransport(rc);
- rc = new WireFormatNegotiator(rc);
return rc;
}
Modified: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/WireFormatNegotiator.cs
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/WireFormatNegotiator.cs?view=diff&rev=454489&r1=454488&r2=454489
==============================================================================
--- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/WireFormatNegotiator.cs (original)
+++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/WireFormatNegotiator.cs Mon Oct 9 13:18:25 2006
@@ -1,50 +1,106 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using ActiveMQ.Commands;
-using ActiveMQ.Transport;
-using System;
-
-namespace ActiveMQ.Transport
-{
-
- /// <summary>
- /// A Transport which negotiates the wire format
- /// </summary>
- public class WireFormatNegotiator : TransportFilter
- {
-
- public WireFormatNegotiator(ITransport next) : base(next) {
- }
-
- public override void Start() {
- base.Start();
-
-
- // now lets start the protocol negotiation
- WireFormatInfo info = new WireFormatInfo();
- info.StackTraceEnabled=false;
- info.TightEncodingEnabled=false;
- info.TcpNoDelayEnabled=false;
- info.CacheEnabled=false;
- info.SizePrefixDisabled=false;
- info.Version = 1;
-
- Oneway(info);
- }
- }
-}
-
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System.IO;
+using System.Threading;
+using ActiveMQ.Commands;
+using ActiveMQ.OpenWire;
+using ActiveMQ.Transport;
+using System;
+using ActiveMQ.Util;
+
+namespace ActiveMQ.Transport
+{
+
+ /// <summary>
+ /// A Transport which negotiates the wire format
+ /// </summary>
+ public class WireFormatNegotiator : TransportFilter
+ {
+ private OpenWireFormat wireFormat;
+ private TimeSpan negotiateTimeout=new TimeSpan(0,0,15);
+
+ private AtomicBoolean firstStart=new AtomicBoolean(true);
+ private CountDownLatch readyCountDownLatch = new CountDownLatch(1);
+ private CountDownLatch wireInfoSentDownLatch = new CountDownLatch(1);
+
+ public WireFormatNegotiator(ITransport next, OpenWireFormat wireFormat)
+ : base(next)
+ {
+ this.wireFormat = wireFormat;
+ }
+
+ public override void Start() {
+ base.Start();
+ if (firstStart.compareAndSet(true, false))
+ {
+ try
+ {
+ next.Oneway(wireFormat.PreferedWireFormatInfo);
+ }
+ finally
+ {
+ wireInfoSentDownLatch.countDown();
+ }
+ }
+ }
+
+ public override void Dispose() {
+ base.Dispose();
+ readyCountDownLatch.countDown();
+ }
+
+ public override void Oneway(Command command)
+ {
+ if (!readyCountDownLatch.await(negotiateTimeout))
+ throw new IOException("Wire format negociation timeout: peer did not send his wire format.");
+ next.Oneway(command);
+ }
+
+ protected override void OnCommand(ITransport sender, Command command)
+ {
+ if ( command.GetDataStructureType() == WireFormatInfo.ID_WireFormatInfo )
+ {
+ WireFormatInfo info = (WireFormatInfo)command;
+ try
+ {
+ if (!info.Valid)
+ {
+ throw new IOException("Remote wire format magic is invalid");
+ }
+ wireInfoSentDownLatch.await(negotiateTimeout);
+ wireFormat.renegotiateWireFormat(info);
+ }
+ catch (Exception e)
+ {
+ OnException(this, e);
+ }
+ finally
+ {
+ readyCountDownLatch.countDown();
+ }
+ }
+ this.commandHandler(sender, command);
+ }
+
+ protected override void OnException(ITransport sender, Exception command)
+ {
+ readyCountDownLatch.countDown();
+ this.exceptionHandler(sender, command);
+ }
+ }
+}
+
Added: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Util/AtomicBoolean.cs
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Util/AtomicBoolean.cs?view=auto&rev=454489
==============================================================================
--- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Util/AtomicBoolean.cs (added)
+++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Util/AtomicBoolean.cs Mon Oct 9 13:18:25 2006
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace ActiveMQ.Util
+{
+ class AtomicBoolean
+ {
+ bool value;
+
+ public AtomicBoolean(bool b)
+ {
+ value = b;
+ }
+
+ public bool compareAndSet(bool expected, bool newValue)
+ {
+ lock(this)
+ {
+ if (value == expected)
+ {
+ value = newValue;
+ return true;
+ }
+ return false;
+ }
+ }
+ }
+}
Added: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Util/CountDownLatch.cs
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Util/CountDownLatch.cs?view=auto&rev=454489
==============================================================================
--- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Util/CountDownLatch.cs (added)
+++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Util/CountDownLatch.cs Mon Oct 9 13:18:25 2006
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading;
+
+namespace ActiveMQ.Util
+{
+ class CountDownLatch
+ {
+ int remaining;
+ public CountDownLatch(int i)
+ {
+ remaining=i;
+ }
+
+ public void countDown()
+ {
+ lock(this)
+ {
+ if( remaining > 0 ) {
+ remaining--;
+ Monitor.PulseAll(this);
+ }
+ }
+ }
+
+ public bool await(TimeSpan timeout)
+ {
+ lock (this)
+ {
+ if (remaining > 0)
+ {
+ Monitor.Wait(this, timeout);
+ if (remaining > 0)
+ {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+ }
+}
Added: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/CommonAssemblyInfo.cs
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/CommonAssemblyInfo.cs?view=auto&rev=454489
==============================================================================
--- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/CommonAssemblyInfo.cs (added)
+++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/CommonAssemblyInfo.cs Mon Oct 9 13:18:25 2006
@@ -0,0 +1,27 @@
+using System;
+using System.Reflection;
+using System.Runtime.InteropServices;
+
+//------------------------------------------------------------------------------
+// <auto-generated>
+// This code was generated by a tool.
+// Runtime Version:2.0.50727.42
+//
+// Changes to this file may cause incorrect behavior and will be lost if
+// the code is regenerated.
+// </auto-generated>
+//------------------------------------------------------------------------------
+
+[assembly: ComVisibleAttribute(false)]
+[assembly: CLSCompliantAttribute(true)]
+[assembly: AssemblyTitleAttribute("Apache NMS for MSMQ")]
+[assembly: AssemblyDescriptionAttribute("An NMS (.Net Messaging Library) to MSMQ")]
+[assembly: AssemblyConfigurationAttribute("SNAPSHOT")]
+[assembly: AssemblyCompanyAttribute("http://incubator.apache.org/activemq/")]
+[assembly: AssemblyProductAttribute("Apache ActiveMQ")]
+[assembly: AssemblyCopyrightAttribute("Copyright (C) 2005-2006 Apache Software Foundation")]
+[assembly: AssemblyTrademarkAttribute("")]
+[assembly: AssemblyCultureAttribute("")]
+[assembly: AssemblyVersionAttribute("4.0")]
+[assembly: AssemblyInformationalVersionAttribute("4.0")]
+
Modified: incubator/activemq/activemq-dotnet/trunk/src/test/csharp/NMS/Test/NMSTestSupport.cs
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/test/csharp/NMS/Test/NMSTestSupport.cs?view=diff&rev=454489&r1=454488&r2=454489
==============================================================================
--- incubator/activemq/activemq-dotnet/trunk/src/test/csharp/NMS/Test/NMSTestSupport.cs (original)
+++ incubator/activemq/activemq-dotnet/trunk/src/test/csharp/NMS/Test/NMSTestSupport.cs Mon Oct 9 13:18:25 2006
@@ -196,7 +196,7 @@
protected virtual string CreateDestinationName()
{
- return "Test.DotNet." + GetType().Name;
+ return "Test.DotNet." + GetType().Name + "." + DateTime.Now.Ticks;
}
protected virtual IMessage CreateMessage()