You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jg...@apache.org on 2008/10/25 03:02:56 UTC
svn commit: r707803 - in
/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk: ./ src/main/csharp/
src/main/csharp/Commands/ src/main/csharp/State/ src/main/csharp/Threads/
src/main/csharp/Transport/ src/main/csharp/Transport/Discovery/
src/main/csharp/...
Author: jgomes
Date: Fri Oct 24 18:02:55 2008
New Revision: 707803
URL: http://svn.apache.org/viewvc?rev=707803&view=rev
Log:
Integrate patch submitted from Robert Walls for Multicast Discovery support. Thanks, Robert!
Fixes [AMQNET-98]. (See https://issues.apache.org/activemq/browse/AMQNET-98)
Added:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Discovery/ (with props)
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Discovery/DiscoveryTransportFactory.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Discovery/Multicast/ (with props)
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Discovery/Multicast/MulticastDiscoveryAgent.cs
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQDestination.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ProducerAck.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/CommandVisitorAdapter.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionState.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConsumerState.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ICommandVisitor.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ProducerState.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/SessionState.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/SynchronizedObjects.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ThreadSimulator.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/Tracked.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/TransactionState.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/DefaultThreadPools.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/PooledTaskRunner.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/Task.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/TaskRunner.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/TaskRunnerFactory.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/BackupTransport.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransportFactory.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/FutureResponse.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFactory.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/vs2008-activemq.csproj
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQDestination.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQDestination.cs?rev=707803&r1=707802&r2=707803&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQDestination.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQDestination.cs Fri Oct 24 18:02:55 2008
@@ -440,10 +440,6 @@
}
}
- /// <summary>
- /// </summary>
- /// <returns>a list of child destinations if this destination represents a composite destination.</returns>
-
/*public List GetChildDestinations() {
List answer = new ArrayList();
StringTokenizer iter = new StringTokenizer(physicalName, COMPOSITE_SEPARATOR);
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ProducerAck.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ProducerAck.cs?rev=707803&r1=707802&r2=707803&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ProducerAck.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ProducerAck.cs Fri Oct 24 18:02:55 2008
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -20,17 +20,14 @@
namespace Apache.NMS.ActiveMQ.Commands
{
-
- /**
- * A ProducerAck command is sent by a broker to a producer to let it know it has
- * received and processed messages that it has produced. The producer will be
- * flow controlled if it does not receive ProducerAck commands back from the
- * broker.
- *
- */
+ /// <summary>
+ /// A ProducerAck command is sent by a broker to a producer to let it know it has
+ /// received and processed messages that it has produced. The producer will be
+ /// flow controlled if it does not receive ProducerAck commands back from the
+ /// broker.
+ /// </summary>
public class ProducerAck : BaseCommand
{
-
protected ProducerId myProducerId;
protected int mySize;
@@ -49,10 +46,9 @@
return visitor.processProducerAck(this);
}
- /**
- * The producer id that this ack message is destined for.
- *
- */
+ /// <summary>
+ /// The producer id that this ack message is destined for.
+ /// </summary>
public ProducerId ProducerId
{
get
@@ -65,10 +61,9 @@
}
}
- /**
- * The number of bytes that are being acked.
- *
- */
+ /// <summary>
+ /// The number of bytes that are being acked.
+ /// </summary>
public int Size
{
get
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs?rev=707803&r1=707802&r2=707803&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs Fri Oct 24 18:02:55 2008
@@ -30,22 +30,23 @@
public const string DEFAULT_BROKER_URL = "activemq:tcp://localhost:61616";
public const string ENV_BROKER_URL = "ACTIVEMQ_BROKER_URL";
+ private static event ExceptionListener onException;
private Uri brokerUri;
private string connectionUserName;
private string connectionPassword;
private string clientId;
+ static ConnectionFactory()
+ {
+ TransportFactory.OnException += ConnectionFactory.ExceptionHandler;
+ }
+
public static string GetDefaultBrokerUrl()
{
#if (PocketPC||NETCF||NETCF_2_0)
return DEFAULT_BROKER_URL;
#else
- string answer = Environment.GetEnvironmentVariable(ENV_BROKER_URL);
- if(answer == null)
- {
- answer = DEFAULT_BROKER_URL;
- }
- return answer;
+ return Environment.GetEnvironmentVariable(ENV_BROKER_URL) ?? DEFAULT_BROKER_URL;
#endif
}
@@ -83,10 +84,16 @@
public IConnection CreateConnection(string userName, string password)
{
Uri uri = brokerUri;
- // Do we need to strip off the activemq prefix??
- if("activemq".Equals(brokerUri.Scheme))
+ string scheme = brokerUri.Scheme;
+
+ if(null != scheme)
{
- uri = new Uri(brokerUri.AbsolutePath + brokerUri.Query);
+ // Do we need to strip off the activemq prefix??
+ scheme = scheme.ToLower();
+ if("activemq".Equals(scheme))
+ {
+ uri = new Uri(brokerUri.AbsolutePath + brokerUri.Query);
+ }
}
ConnectionInfo info = CreateConnectionInfo(userName, password);
@@ -129,7 +136,17 @@
set { clientId = value; }
}
- // Implementation methods
+ public event ExceptionListener OnException
+ {
+ add { onException += value; }
+ remove
+ {
+ if(onException != null)
+ {
+ onException -= value;
+ }
+ }
+ }
protected virtual ConnectionInfo CreateConnectionInfo(string userName, string password)
{
@@ -140,14 +157,8 @@
answer.ConnectionId = connectionId;
answer.UserName = userName;
answer.Password = password;
- if(clientId == null)
- {
- answer.ClientId = CreateNewGuid();
- }
- else
- {
- answer.ClientId = clientId;
- }
+ answer.ClientId = clientId ?? CreateNewGuid();
+
return answer;
}
@@ -156,5 +167,12 @@
return Guid.NewGuid().ToString();
}
+ protected static void ExceptionHandler(Exception ex)
+ {
+ if(ConnectionFactory.onException != null)
+ {
+ ConnectionFactory.onException(ex);
+ }
+ }
}
}
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/CommandVisitorAdapter.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/CommandVisitorAdapter.cs?rev=707803&r1=707802&r2=707803&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/CommandVisitorAdapter.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/CommandVisitorAdapter.cs Fri Oct 24 18:02:55 2008
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionState.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionState.cs?rev=707803&r1=707802&r2=707803&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionState.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionState.cs Fri Oct 24 18:02:55 2008
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs?rev=707803&r1=707802&r2=707803&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs Fri Oct 24 18:02:55 2008
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConsumerState.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConsumerState.cs?rev=707803&r1=707802&r2=707803&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConsumerState.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConsumerState.cs Fri Oct 24 18:02:55 2008
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ICommandVisitor.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ICommandVisitor.cs?rev=707803&r1=707802&r2=707803&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ICommandVisitor.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ICommandVisitor.cs Fri Oct 24 18:02:55 2008
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ProducerState.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ProducerState.cs?rev=707803&r1=707802&r2=707803&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ProducerState.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ProducerState.cs Fri Oct 24 18:02:55 2008
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/SessionState.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/SessionState.cs?rev=707803&r1=707802&r2=707803&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/SessionState.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/SessionState.cs Fri Oct 24 18:02:55 2008
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/SynchronizedObjects.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/SynchronizedObjects.cs?rev=707803&r1=707802&r2=707803&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/SynchronizedObjects.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/SynchronizedObjects.cs Fri Oct 24 18:02:55 2008
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ThreadSimulator.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ThreadSimulator.cs?rev=707803&r1=707802&r2=707803&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ThreadSimulator.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ThreadSimulator.cs Fri Oct 24 18:02:55 2008
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/Tracked.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/Tracked.cs?rev=707803&r1=707802&r2=707803&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/Tracked.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/Tracked.cs Fri Oct 24 18:02:55 2008
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -15,14 +15,12 @@
* limitations under the License.
*/
-
using Apache.NMS.ActiveMQ.Commands;
namespace Apache.NMS.ActiveMQ.State
{
public class Tracked : Response
{
-
private ThreadSimulator runnable = null;
public Tracked(ThreadSimulator runnable)
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/TransactionState.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/TransactionState.cs?rev=707803&r1=707802&r2=707803&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/TransactionState.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/TransactionState.cs Fri Oct 24 18:02:55 2008
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/DefaultThreadPools.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/DefaultThreadPools.cs?rev=707803&r1=707802&r2=707803&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/DefaultThreadPools.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/DefaultThreadPools.cs Fri Oct 24 18:02:55 2008
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -19,7 +19,7 @@
{
public class DefaultThreadPools
{
- /***
+ /*
* Java's execution model is different enough that I have left out
* the Executure concept in this implementation. This must be
* reviewed to see what is appropriate for the future.
@@ -41,7 +41,7 @@
{
get { return DEFAULT_POOL; }
}
- ***/
+ */
private static TaskRunnerFactory DEFAULT_TASK_RUNNER_FACTORY = new TaskRunnerFactory();
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/PooledTaskRunner.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/PooledTaskRunner.cs?rev=707803&r1=707802&r2=707803&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/PooledTaskRunner.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/PooledTaskRunner.cs Fri Oct 24 18:02:55 2008
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -54,14 +54,13 @@
ThreadPool.QueueUserWorkItem(new WaitCallback(run), this);
}
- /**
- * We Expect MANY wakeup calls on the same TaskRunner.
- */
+ /// <summary>
+ /// We Expect MANY wakeup calls on the same TaskRunner.
+ /// </summary>
public void wakeup()
{
lock(runable)
{
-
// When we get in here, we make some assumptions of state:
// queued=false, iterating=false: wakeup() has not be called and
// therefore task is not executing.
@@ -88,10 +87,10 @@
}
}
- /**
- * shut down the task
- *
- */
+ /// <summary>
+ /// shut down the task
+ /// </summary>
+ /// <param name="timeout"></param>
public void shutdown(int timeout)
{
lock(runable)
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/Task.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/Task.cs?rev=707803&r1=707802&r2=707803&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/Task.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/Task.cs Fri Oct 24 18:02:55 2008
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/TaskRunner.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/TaskRunner.cs?rev=707803&r1=707802&r2=707803&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/TaskRunner.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/TaskRunner.cs Fri Oct 24 18:02:55 2008
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/TaskRunnerFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/TaskRunnerFactory.cs?rev=707803&r1=707802&r2=707803&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/TaskRunnerFactory.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/TaskRunnerFactory.cs Fri Oct 24 18:02:55 2008
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -29,7 +29,6 @@
/// </summary>
public class TaskRunnerFactory
{
-
private int maxIterationsPerRun;
private String name;
private ThreadPriority priority;
Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Discovery/
------------------------------------------------------------------------------
bugtraq:label = Issue#:
Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Discovery/
------------------------------------------------------------------------------
--- bugtraq:message (added)
+++ bugtraq:message Fri Oct 24 18:02:55 2008
@@ -0,0 +1 @@
+Fixes [AMQNET-%BUGID%]. (See https://issues.apache.org/activemq/browse/AMQNET-%BUGID%)
Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Discovery/
------------------------------------------------------------------------------
bugtraq:url = https://issues.apache.org/activemq/browse/AMQNET-%BUGID%
Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Discovery/DiscoveryTransportFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Discovery/DiscoveryTransportFactory.cs?rev=707803&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Discovery/DiscoveryTransportFactory.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Discovery/DiscoveryTransportFactory.cs Fri Oct 24 18:02:55 2008
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+using Apache.NMS.ActiveMQ.Transport.Discovery.Multicast;
+using Apache.NMS.ActiveMQ.Transport.Tcp;
+
+namespace Apache.NMS.ActiveMQ.Transport.Discovery
+{
+ public class DiscoveryTransportFactory : ITransportFactory
+ {
+ private const int TIMEOUT_IN_SECONDS = 20;
+
+ private static Uri discoveredUri;
+ private static MulticastDiscoveryAgent agent;
+ private static string currentServiceName;
+ private static readonly object uriLock = new object();
+ public static event ExceptionListener OnException;
+
+ public DiscoveryTransportFactory()
+ {
+ currentServiceName = String.Empty;
+ }
+
+ private static void agent_OnNewServiceFound(string brokerName, string serviceName)
+ {
+ lock(uriLock)
+ {
+ if(discoveredUri == null)
+ {
+ currentServiceName = serviceName;
+ discoveredUri = new Uri(currentServiceName);
+ }
+
+ // This will end the wait in the CreateTransport method.
+ Monitor.Pulse(uriLock);
+ }
+ }
+
+ private static void agent_OnServiceRemoved(string brokerName, string serviceName)
+ {
+ if(serviceName == currentServiceName)
+ {
+ lock(uriLock)
+ {
+ discoveredUri = null;
+ }
+
+ if(OnException != null)
+ {
+ OnException(new Exception("Broker is dead!"));
+ }
+ }
+ }
+
+ private static MulticastDiscoveryAgent Agent
+ {
+ get
+ {
+ if(agent == null)
+ {
+ agent = new MulticastDiscoveryAgent();
+ agent.OnNewServiceFound += agent_OnNewServiceFound;
+ agent.OnServiceRemoved += agent_OnServiceRemoved;
+ }
+
+ return agent;
+ }
+ }
+
+ #region Overloaded FailoverTransportFactory Members
+
+ public ITransport CreateTransport(Uri location)
+ {
+ if(!Agent.IsStarted)
+ {
+ Agent.Start();
+ }
+
+ DateTime expireTime = DateTime.Now.AddSeconds(TIMEOUT_IN_SECONDS);
+
+ // If a new broker is found the agent will fire an event which will result in discoveredUri being set.
+ lock(uriLock)
+ {
+ while(discoveredUri == null)
+ {
+ if(expireTime < DateTime.Now)
+ {
+ throw new NMSConnectionException(
+ "Unable to find a connection before the timeout period expired.");
+ }
+
+ Monitor.Wait(uriLock, TIMEOUT_IN_SECONDS * 1000);
+ }
+ }
+
+ ITransport transport;
+
+ lock(uriLock)
+ {
+ TcpTransportFactory tcpTransFactory = new TcpTransportFactory();
+
+ transport = tcpTransFactory.CreateTransport(new Uri(discoveredUri + location.Query));
+ }
+
+ return transport;
+ }
+
+ public ITransport CompositeConnect(Uri location)
+ {
+ throw new NMSConnectionException("Composite connection not supported with Discovery transport.");
+ }
+
+ #endregion
+ }
+}
Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Discovery/Multicast/
------------------------------------------------------------------------------
bugtraq:label = Issue#:
Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Discovery/Multicast/
------------------------------------------------------------------------------
--- bugtraq:message (added)
+++ bugtraq:message Fri Oct 24 18:02:55 2008
@@ -0,0 +1 @@
+Fixes [AMQNET-%BUGID%]. (See https://issues.apache.org/activemq/browse/AMQNET-%BUGID%)
Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Discovery/Multicast/
------------------------------------------------------------------------------
bugtraq:url = https://issues.apache.org/activemq/browse/AMQNET-%BUGID%
Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Discovery/Multicast/MulticastDiscoveryAgent.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Discovery/Multicast/MulticastDiscoveryAgent.cs?rev=707803&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Discovery/Multicast/MulticastDiscoveryAgent.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Discovery/Multicast/MulticastDiscoveryAgent.cs Fri Oct 24 18:02:55 2008
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.ComponentModel;
+using System.Net;
+using System.Net.Sockets;
+using System.Threading;
+
+namespace Apache.NMS.ActiveMQ.Transport.Discovery.Multicast
+{
+ internal delegate void NewBrokerServiceFound(string brokerName, string serviceName);
+ internal delegate void BrokerServiceRemoved(string brokerName, string serviceName);
+
+ internal class MulticastDiscoveryAgent : IDisposable
+ {
+ public const int MAX_SOCKET_CONNECTION_RETRY_ATTEMPS = 3;
+ public const int DEFAULT_BACKOFF_MILLISECONDS = 100;
+ public const int BACKOFF_MULTIPLIER = 2;
+ public const string DEFAULT_DISCOVERY_URI_STRING = "multicast://localhost:6155";
+ private const string TYPE_SUFFIX = "ActiveMQ-4.";
+ private const string ALIVE = "alive";
+ private const string DEAD = "dead";
+ private const char DELIMITER = '%';
+ private const int BUFF_SIZE = 8192;
+ private const string DEFAULT_GROUP = "default";
+ private const int EXPIRATION_OFFSET_IN_SECONDS = 2;
+ private const int WORKER_KILL_TIME_SECONDS = 10;
+ private const int SOCKET_TIMEOUT_MILLISECONDS = 500;
+
+ private string group;
+ private bool isStarted = false;
+ private readonly Uri discoveryUri;
+ private Socket multicastSocket;
+ private IPEndPoint endPoint;
+ private BackgroundWorker worker;
+
+ private event NewBrokerServiceFound onNewServiceFound;
+ private event BrokerServiceRemoved onServiceRemoved;
+
+ /// <summary>
+ /// Indexed by service name
+ /// </summary>
+ private readonly Dictionary<string, RemoteBrokerData> remoteBrokers;
+
+ public MulticastDiscoveryAgent()
+ {
+ discoveryUri = new Uri(DEFAULT_DISCOVERY_URI_STRING);
+ group = DEFAULT_GROUP;
+ remoteBrokers = new Dictionary<string, RemoteBrokerData>();
+ }
+
+ public void Start()
+ {
+ if(!isStarted)
+ {
+ isStarted = true;
+
+ if(multicastSocket == null)
+ {
+ int numFailedAttempts = 0;
+ int backoffTime = DEFAULT_BACKOFF_MILLISECONDS;
+
+ while(!TryToConnectSocket())
+ {
+ numFailedAttempts++;
+
+ if(numFailedAttempts > MAX_SOCKET_CONNECTION_RETRY_ATTEMPS)
+ {
+ throw new ApplicationException(
+ "Could not open the socket in order to discover advertising brokers.");
+ }
+
+ Thread.Sleep(backoffTime);
+ backoffTime *= BACKOFF_MULTIPLIER;
+ }
+ }
+
+ if(worker == null)
+ {
+ worker = new BackgroundWorker();
+ worker.DoWork += worker_DoWork;
+ }
+
+ if(!worker.IsBusy)
+ {
+ worker.RunWorkerAsync();
+ }
+
+ }
+ }
+
+ public void Stop()
+ {
+ isStarted = false;
+
+ DateTime expireTime = DateTime.Now.AddSeconds(WORKER_KILL_TIME_SECONDS);
+
+ //wait for the worker to stop. Give it up to WORKER_KILL_TIME_SECONDS
+ while(worker.IsBusy)
+ {
+ if(expireTime < DateTime.Now)
+ {
+ throw new ApplicationException("Unable to stop the worker thread.");
+ }
+ }
+ }
+
+ private bool TryToConnectSocket()
+ {
+ bool hasSucceeded = false;
+
+ try
+ {
+ multicastSocket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);
+ endPoint = new IPEndPoint(IPAddress.Any, discoveryUri.Port);
+
+ //We have to allow reuse in the multicast socket. Otherwise, we would be unable to use multiple clients on the same machine.
+ multicastSocket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, 1);
+ multicastSocket.Bind(endPoint);
+
+ IPAddress ip = IPAddress.Parse(discoveryUri.Host);
+
+ multicastSocket.SetSocketOption(SocketOptionLevel.IP, SocketOptionName.AddMembership,
+ new MulticastOption(ip, IPAddress.Any));
+ multicastSocket.ReceiveTimeout = SOCKET_TIMEOUT_MILLISECONDS;
+ hasSucceeded = true;
+ }
+ catch(SocketException)
+ {
+ }
+
+ return hasSucceeded;
+ }
+
+ private void worker_DoWork(object sender, DoWorkEventArgs e)
+ {
+ Thread.CurrentThread.Name = "Discovery Agent Thread.";
+ byte[] buffer = new byte[BUFF_SIZE];
+ string receivedInfoRaw;
+ string receivedInfo;
+
+ while(isStarted)
+ {
+ try
+ {
+ multicastSocket.Receive(buffer);
+ receivedInfoRaw = System.Text.Encoding.UTF8.GetString(buffer);
+ // We have to remove all of the null bytes.
+ receivedInfo = receivedInfoRaw.Substring(0, receivedInfoRaw.IndexOf("\0"));
+ ProcessBrokerMessage(receivedInfo);
+
+ }
+ catch(SocketException)
+ {
+ // There was no multicast message sent before the timeout expired...Let us try again.
+ }
+
+ //We need to clear the buffer.
+ buffer[0] = 0x0;
+ ExpireOldServices();
+ }
+ }
+
+ private void ProcessBrokerMessage(string message)
+ {
+ string payload;
+ string brokerName;
+ string serviceName;
+
+ if(message.StartsWith(MulticastType))
+ {
+ payload = message.Substring(MulticastType.Length);
+ brokerName = GetBrokerName(payload);
+ serviceName = GetServiceName(payload);
+
+ if(payload.StartsWith(ALIVE))
+ {
+ ProcessAliveBrokerMessage(brokerName, serviceName);
+ }
+ else if(payload.StartsWith(DEAD))
+ {
+ ProcessDeadBrokerMessage(brokerName, serviceName);
+ }
+ else
+ {
+ //Malformed Payload
+ }
+ }
+ }
+
+ private void ProcessDeadBrokerMessage(string brokerName, string serviceName)
+ {
+ if(remoteBrokers.ContainsKey(serviceName))
+ {
+ remoteBrokers.Remove(serviceName);
+ if(onServiceRemoved != null)
+ {
+ onServiceRemoved(brokerName, serviceName);
+ }
+ }
+ }
+
+ private void ProcessAliveBrokerMessage(string brokerName, string serviceName)
+ {
+ if(remoteBrokers.ContainsKey(serviceName))
+ {
+ remoteBrokers[serviceName].UpdateHeartBeat();
+ }
+ else
+ {
+ remoteBrokers.Add(serviceName, new RemoteBrokerData(brokerName, serviceName));
+
+ if(onNewServiceFound != null)
+ {
+ onNewServiceFound(brokerName, serviceName);
+ }
+ }
+ }
+
+ private static string GetBrokerName(string payload)
+ {
+ string[] results = payload.Split(DELIMITER);
+ return results[1];
+ }
+
+ private static string GetServiceName(string payload)
+ {
+ string[] results = payload.Split(DELIMITER);
+ return results[2];
+ }
+
+ private void ExpireOldServices()
+ {
+ DateTime expireTime;
+ List<RemoteBrokerData> deadServices = new List<RemoteBrokerData>();
+
+ foreach(KeyValuePair<string, RemoteBrokerData> brokerService in remoteBrokers)
+ {
+ expireTime = brokerService.Value.lastHeartBeat.AddSeconds(EXPIRATION_OFFSET_IN_SECONDS);
+ if(DateTime.Now > expireTime)
+ {
+ deadServices.Add(brokerService.Value);
+ }
+ }
+
+ // Remove all of the dead services
+ for(int i = 0; i < deadServices.Count; i++)
+ {
+ ProcessDeadBrokerMessage(deadServices[i].brokerName, deadServices[i].serviceName);
+ }
+ }
+
+ public bool IsStarted
+ {
+ get { return isStarted; }
+ }
+
+ public string Group
+ {
+ get { return group; }
+ set { group = value; }
+ }
+
+ internal string MulticastType
+ {
+ get { return group + "." + TYPE_SUFFIX; }
+ }
+
+ internal event NewBrokerServiceFound OnNewServiceFound
+ {
+ add { onNewServiceFound += value; }
+ remove { onNewServiceFound -= value; }
+ }
+
+ internal event BrokerServiceRemoved OnServiceRemoved
+ {
+ add { onServiceRemoved += value; }
+ remove { onServiceRemoved += value; }
+ }
+
+ public void Dispose()
+ {
+ if(isStarted)
+ {
+ Stop();
+ }
+
+ multicastSocket.Shutdown(SocketShutdown.Both);
+ multicastSocket = null;
+ }
+
+ internal class RemoteBrokerData
+ {
+ internal string brokerName;
+ internal string serviceName;
+ internal DateTime lastHeartBeat;
+
+ internal RemoteBrokerData(string brokerName, string serviceName)
+ {
+ this.brokerName = brokerName;
+ this.serviceName = serviceName;
+ this.lastHeartBeat = DateTime.Now;
+ }
+
+ internal void UpdateHeartBeat()
+ {
+ this.lastHeartBeat = DateTime.Now;
+ }
+
+ }
+ }
+}
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/BackupTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/BackupTransport.cs?rev=707803&r1=707802&r2=707803&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/BackupTransport.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/BackupTransport.cs Fri Oct 24 18:02:55 2008
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs?rev=707803&r1=707802&r2=707803&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs Fri Oct 24 18:02:55 2008
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -44,7 +44,6 @@
private Mutex reconnectMutex = new Mutex();
private Mutex backupMutex = new Mutex();
private Mutex sleepMutex = new Mutex();
- private Mutex listenerMutex = new Mutex();
private ConnectionStateTracker stateTracker = new ConnectionStateTracker();
private Dictionary<int, Command> requestMap = new Dictionary<int, Command>();
@@ -75,14 +74,8 @@
public TimeSpan RequestTimeout
{
- get
- {
- return requestTimeout;
- }
- set
- {
- requestTimeout = value;
- }
+ get { return requestTimeout; }
+ set { requestTimeout = value; }
}
private class FailoverTask : Task
@@ -102,7 +95,7 @@
try
{
parent.backupMutex.WaitOne();
- if(parent.ConnectedTransport == null && !parent.disposed)
+ if(parent.ConnectedTransport == null && doReconnect)
{
result = parent.doReconnect();
buildBackup = false;
@@ -112,6 +105,7 @@
{
parent.backupMutex.ReleaseMutex();
}
+
if(buildBackup)
{
parent.buildBackups();
@@ -165,12 +159,14 @@
{
}
}
+
Tracked t = oo as Tracked;
if(t != null)
{
t.onResponses();
}
}
+
if(!initialized)
{
if(command.IsBrokerInfo)
@@ -185,10 +181,12 @@
add(brokerString);
}
}
+
initialized = true;
}
}
}
+
this.Command(sender, command);
}
@@ -295,6 +293,7 @@
{
return;
}
+
started = false;
disposed = true;
connected = false;
@@ -313,6 +312,7 @@
{
reconnectMutex.ReleaseMutex();
}
+
try
{
sleepMutex.WaitOne();
@@ -321,6 +321,7 @@
{
sleepMutex.ReleaseMutex();
}
+
reconnectTask.shutdown();
if(transportToStop != null)
{
@@ -330,146 +331,74 @@
public int InitialReconnectDelay
{
- get
- {
- return _initialReconnectDelay;
- }
- set
- {
- _initialReconnectDelay = value;
- }
+ get { return _initialReconnectDelay; }
+ set { _initialReconnectDelay = value; }
}
public int MaxReconnectDelay
{
- get
- {
- return _maxReconnectDelay;
- }
- set
- {
- _maxReconnectDelay = value;
- }
+ get { return _maxReconnectDelay; }
+ set { _maxReconnectDelay = value; }
}
public int ReconnectDelay
{
- get
- {
- return _reconnectDelay;
- }
- set
- {
- _reconnectDelay = value;
- }
+ get { return _reconnectDelay; }
+ set { _reconnectDelay = value; }
}
public int ReconnectDelayExponent
{
- get
- {
- return _backOffMultiplier;
- }
- set
- {
- _backOffMultiplier = value;
- }
+ get { return _backOffMultiplier; }
+ set { _backOffMultiplier = value; }
}
public ITransport ConnectedTransport
{
- get
- {
- return connectedTransport.Value;
- }
- set
- {
- connectedTransport.Value = value;
- }
+ get { return connectedTransport.Value; }
+ set { connectedTransport.Value = value; }
}
public Uri ConnectedTransportURI
{
- get
- {
- return connectedTransportURI;
- }
- set
- {
- connectedTransportURI = value;
- }
+ get { return connectedTransportURI; }
+ set { connectedTransportURI = value; }
}
public int MaxReconnectAttempts
{
- get
- {
- return _maxReconnectAttempts;
- }
- set
- {
- _maxReconnectAttempts = value;
- }
+ get { return _maxReconnectAttempts; }
+ set { _maxReconnectAttempts = value; }
}
public bool Randomize
{
- get
- {
- return _randomize;
- }
- set
- {
- _randomize = value;
- }
+ get { return _randomize; }
+ set { _randomize = value; }
}
public bool Backup
{
- get
- {
- return _backup;
- }
- set
- {
- _backup = value;
- }
+ get { return _backup; }
+ set { _backup = value; }
}
public int BackupPoolSize
{
- get
- {
- return _backupPoolSize;
- }
- set
- {
- _backupPoolSize = value;
- }
+ get { return _backupPoolSize; }
+ set { _backupPoolSize = value; }
}
public bool TrackMessages
{
- get
- {
- return _trackMessages;
- }
- set
- {
- _trackMessages = value;
- }
+ get { return _trackMessages; }
+ set { _trackMessages = value; }
}
public int MaxCacheSize
{
- get
- {
- return _maxCacheSize;
- }
- set
- {
- _maxCacheSize = value;
- }
+ get { return _maxCacheSize; }
+ set { _maxCacheSize = value; }
}
/// <summary>
@@ -486,7 +415,6 @@
Exception error = null;
try
{
-
try
{
reconnectMutex.WaitOne();
@@ -498,6 +426,7 @@
// Skipping send of ShutdownInfo command when not connected.
return;
}
+
if(command is RemoveInfo)
{
// Simulate response to RemoveInfo command
@@ -539,6 +468,7 @@
{
reconnectMutex.WaitOne();
}
+
transport = ConnectedTransport;
}
@@ -630,6 +560,7 @@
Thread.CurrentThread.Interrupt();
throw new ThreadInterruptedException();
}
+
if(!disposed)
{
if(error != null)
@@ -667,6 +598,7 @@
}
}
}
+
Reconnect();
}
@@ -679,6 +611,7 @@
uris.Remove(u[i]);
}
}
+
Reconnect();
}
@@ -750,6 +683,7 @@
{
removed = l.Remove(failedConnectTransportURI);
}
+
if(Randomize)
{
// Randomly, reorder the list by random swapping
@@ -762,10 +696,12 @@
l[i] = t;
}
}
+
if(removed)
{
l.Add(failedConnectTransportURI);
}
+
return l;
}
}
@@ -783,6 +719,7 @@
{
tmpMap = new Dictionary<int, Command>(requestMap);
}
+
foreach(Command command in tmpMap.Values)
{
t.Oneway(command);
@@ -791,14 +728,8 @@
public bool UseExponentialBackOff
{
- get
- {
- return _useExponentialBackOff;
- }
- set
- {
- _useExponentialBackOff = value;
- }
+ get { return _useExponentialBackOff; }
+ set { _useExponentialBackOff = value; }
}
public override String ToString()
@@ -821,10 +752,7 @@
public bool IsFaultTolerant
{
- get
- {
- return true;
- }
+ get { return true; }
}
bool doReconnect()
@@ -927,6 +855,7 @@
{
Tracer.Info("Successfully reconnected to " + uri);
}
+
connected = true;
return false;
}
@@ -943,9 +872,7 @@
{
Tracer.Error("Failed to connect to transport after: " + connectFailures + " attempt(s)");
connectionFailure = failure;
-
onException(this, connectionFailure);
-
return false;
}
}
@@ -953,6 +880,7 @@
{
reconnectMutex.ReleaseMutex();
}
+
if(!disposed)
{
@@ -1004,6 +932,7 @@
backups.Remove(bt);
}
}
+
foreach(Uri uri in connectList)
{
if(ConnectedTransportURI != null && !ConnectedTransportURI.Equals(uri))
@@ -1028,6 +957,7 @@
Tracer.Debug("Failed to build backup ");
}
}
+
if(backups.Count < BackupPoolSize)
{
break;
@@ -1039,23 +969,18 @@
{
backupMutex.ReleaseMutex();
}
+
return false;
}
public bool IsDisposed
{
- get
- {
- return disposed;
- }
+ get { return disposed; }
}
public bool Connected
{
- get
- {
- return connected;
- }
+ get { return connected; }
}
public void Reconnect(Uri uri)
@@ -1080,34 +1005,19 @@
public CommandHandler Command
{
- get
- {
- return _commandHandler;
- }
- set
- {
- _commandHandler = value;
- }
+ get { return _commandHandler; }
+ set { _commandHandler = value; }
}
public ExceptionHandler Exception
{
- get
- {
- return _exceptionHandler;
- }
- set
- {
- _exceptionHandler = value;
- }
+ get { return _exceptionHandler; }
+ set { _exceptionHandler = value; }
}
public bool IsStarted
{
- get
- {
- return started;
- }
+ get { return started; }
}
public void Dispose()
@@ -1122,6 +1032,7 @@
{
// get rid of unmanaged stuff
}
+
disposed = true;
}
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransportFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransportFactory.cs?rev=707803&r1=707802&r2=707803&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransportFactory.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransportFactory.cs Fri Oct 24 18:02:55 2008
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/FutureResponse.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/FutureResponse.cs?rev=707803&r1=707802&r2=707803&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/FutureResponse.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/FutureResponse.cs Fri Oct 24 18:02:55 2008
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
using Apache.NMS.ActiveMQ.Commands;
using System;
using System.Threading;
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFactory.cs?rev=707803&r1=707802&r2=707803&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFactory.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFactory.cs Fri Oct 24 18:02:55 2008
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -18,6 +18,7 @@
using System;
using System.Collections.Generic;
+using Apache.NMS.ActiveMQ.Transport.Discovery;
using Apache.NMS.ActiveMQ.Transport.Failover;
using Apache.NMS.ActiveMQ.Transport.Tcp;
@@ -25,13 +26,49 @@
{
public class TransportFactory
{
-
- private static Dictionary<String, ITransportFactory> TRANSPORT_FACTORYS = new Dictionary<String, ITransportFactory>();
+ private static readonly Dictionary<String, ITransportFactory> factoryCache;
+ public static event ExceptionListener OnException;
static TransportFactory()
{
- TRANSPORT_FACTORYS.Add("tcp", new TcpTransportFactory());
- TRANSPORT_FACTORYS.Add("failover", new FailoverTransportFactory());
+ TransportFactory.factoryCache = new Dictionary<string, ITransportFactory>();
+ }
+
+ private static void HandleException(Exception ex)
+ {
+ if(TransportFactory.OnException != null)
+ {
+ TransportFactory.OnException(ex);
+ }
+ }
+
+ private static ITransportFactory AddTransportFactory(string scheme)
+ {
+ ITransportFactory factory;
+
+ switch(scheme.ToLower())
+ {
+ case "tcp":
+ factory = new TcpTransportFactory();
+ break;
+ case "discovery":
+ factory = new DiscoveryTransportFactory();
+ DiscoveryTransportFactory.OnException += TransportFactory.HandleException;
+ break;
+ case "failover":
+ factory = new FailoverTransportFactory();
+ break;
+ default:
+ throw new ApplicationException("The transport " + scheme + " is not supported.");
+ }
+
+ if(null == factory)
+ {
+ throw new ApplicationException("Unable to create a transport.");
+ }
+
+ TransportFactory.factoryCache.Add(scheme, factory);
+ return factory;
}
/// <summary>
@@ -41,32 +78,39 @@
/// <returns>the transport</returns>
public static ITransport CreateTransport(Uri location)
{
- ITransportFactory tf = findTransportFactory(location);
+ ITransportFactory tf = TransportFactory.findTransportFactory(location);
return tf.CreateTransport(location);
}
public static ITransport CompositeConnect(Uri location)
{
- ITransportFactory tf = findTransportFactory(location);
+ ITransportFactory tf = TransportFactory.findTransportFactory(location);
return tf.CompositeConnect(location);
}
/// <summary>
+ /// Find the transport factory for the scheme. We will cache the transport
+ /// factory in a lookup table. If we do not support the transport protocol,
+ /// an ApplicationException will be thrown.
/// </summary>
/// <param name="location"></param>
/// <returns></returns>
private static ITransportFactory findTransportFactory(Uri location)
{
- String scheme = location.Scheme;
- if(scheme == null)
+ string scheme = location.Scheme;
+
+ if(null == scheme)
{
throw new IOException("Transport not scheme specified: [" + location + "]");
}
- ITransportFactory tf = TRANSPORT_FACTORYS[scheme];
- if(tf == null)
+
+ scheme = scheme.ToLower();
+ ITransportFactory tf = TransportFactory.factoryCache[scheme];
+ if(null == tf)
{
- throw new ApplicationException("Transport Factory for " + scheme + " does not exist.");
+ tf = TransportFactory.AddTransportFactory(scheme);
}
+
return tf;
}
}
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/vs2008-activemq.csproj
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/vs2008-activemq.csproj?rev=707803&r1=707802&r2=707803&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/vs2008-activemq.csproj (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/vs2008-activemq.csproj Fri Oct 24 18:02:55 2008
@@ -721,6 +721,8 @@
<Compile Include="src\main\csharp\TransactionContext.cs">
<SubType>Code</SubType>
</Compile>
+ <Compile Include="src\main\csharp\Transport\Discovery\DiscoveryTransportFactory.cs" />
+ <Compile Include="src\main\csharp\Transport\Discovery\Multicast\MulticastDiscoveryAgent.cs" />
<Compile Include="src\main\csharp\Transport\Failover\BackupTransport.cs" />
<Compile Include="src\main\csharp\Transport\Failover\FailoverTransport.cs" />
<Compile Include="src\main\csharp\Transport\Failover\FailoverTransportFactory.cs" />