You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/04/23 21:33:13 UTC
svn commit: r1329394 -
/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Discovery/Multicast/MulticastDiscoveryAgent.cs
Author: tabish
Date: Mon Apr 23 19:33:13 2012
New Revision: 1329394
URL: http://svn.apache.org/viewvc?rev=1329394&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQNET-382
check for host equals "default" and use the default Discovery Uri string.
Fixes [AMQNET-382]. (See https://issues.apache.org/activemq/browse/AMQNET-382)
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Discovery/Multicast/MulticastDiscoveryAgent.cs
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Discovery/Multicast/MulticastDiscoveryAgent.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Discovery/Multicast/MulticastDiscoveryAgent.cs?rev=1329394&r1=1329393&r2=1329394&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Discovery/Multicast/MulticastDiscoveryAgent.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Discovery/Multicast/MulticastDiscoveryAgent.cs Mon Apr 23 19:33:13 2012
@@ -1,369 +1,373 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-using System;
-using System.Collections.Generic;
-using System.Net;
-using System.Net.Sockets;
-using System.Threading;
-using Apache.NMS.ActiveMQ.Transport.Tcp;
-
-namespace Apache.NMS.ActiveMQ.Transport.Discovery.Multicast
-{
- internal delegate void NewBrokerServiceFound(string brokerName, string serviceName);
- internal delegate void BrokerServiceRemoved(string brokerName, string serviceName);
-
- internal class MulticastDiscoveryAgent : IDisposable
- {
- public const int MAX_SOCKET_CONNECTION_RETRY_ATTEMPS = 3;
- public const int DEFAULT_BACKOFF_MILLISECONDS = 100;
- public const int BACKOFF_MULTIPLIER = 2;
- public const string DEFAULT_DISCOVERY_URI_STRING = "multicast://239.255.2.3:6155";
- private const string TYPE_SUFFIX = "ActiveMQ-4.";
- private const string ALIVE = "alive";
- private const string DEAD = "dead";
- private const char DELIMITER = '%';
- private const int BUFF_SIZE = 8192;
- private const string DEFAULT_GROUP = "default";
- private const int EXPIRATION_OFFSET_IN_SECONDS = 2;
- private const int WORKER_KILL_TIME_SECONDS = 10;
- private const int SOCKET_TIMEOUT_MILLISECONDS = 500;
-
- private string group;
- private readonly object stopstartSemaphore = new object();
- private bool isStarted = false;
- private Uri discoveryUri;
- private Socket multicastSocket;
- private IPEndPoint endPoint;
- private Thread worker;
-
- private event NewBrokerServiceFound onNewServiceFound;
- private event BrokerServiceRemoved onServiceRemoved;
-
- /// <summary>
- /// Indexed by service name
- /// </summary>
- private readonly Dictionary<string, RemoteBrokerData> remoteBrokers;
-
- public MulticastDiscoveryAgent()
- {
- group = DEFAULT_GROUP;
- remoteBrokers = new Dictionary<string, RemoteBrokerData>();
- }
-
- public void Start()
- {
- lock(stopstartSemaphore)
- {
- if (discoveryUri == null)
- {
- discoveryUri = new Uri(DEFAULT_DISCOVERY_URI_STRING);
- }
-
- if(multicastSocket == null)
- {
- int numFailedAttempts = 0;
- int backoffTime = DEFAULT_BACKOFF_MILLISECONDS;
-
- Tracer.Info("Connecting to multicast discovery socket.");
- while(!TryToConnectSocket())
- {
- numFailedAttempts++;
- if(numFailedAttempts > MAX_SOCKET_CONNECTION_RETRY_ATTEMPS)
- {
- throw new ApplicationException(
- "Could not open the socket in order to discover advertising brokers.");
- }
-
- Thread.Sleep(backoffTime);
- backoffTime *= BACKOFF_MULTIPLIER;
- }
- }
-
- if(worker == null)
- {
- Tracer.Info("Starting multicast discovery agent worker thread");
- worker = new Thread(new ThreadStart(worker_DoWork));
- worker.IsBackground = true;
- worker.Start();
- isStarted = true;
- }
- }
- }
-
- public void Stop()
- {
- Thread localThread = null;
-
- lock(stopstartSemaphore)
- {
- Tracer.Info("Stopping multicast discovery agent worker thread");
- localThread = worker;
- worker = null;
- // Changing the isStarted flag will signal the thread that it needs to shut down.
- isStarted = false;
- }
-
- if(localThread != null)
- {
- // wait for the worker to stop.
- if(!localThread.Join(WORKER_KILL_TIME_SECONDS))
- {
- Tracer.Info("!! Timeout waiting for multicast discovery agent localThread to stop");
- localThread.Abort();
- }
- }
-
- Tracer.Info("Multicast discovery agent worker thread joined");
- }
-
- private bool TryToConnectSocket()
- {
- bool hasSucceeded = false;
-
- try
- {
- multicastSocket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);
- endPoint = new IPEndPoint(IPAddress.Any, discoveryUri.Port);
-
- //We have to allow reuse in the multicast socket. Otherwise, we would be unable to use multiple clients on the same machine.
- multicastSocket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, 1);
- multicastSocket.Bind(endPoint);
-
- IPAddress ipaddress;
-
- if(!TcpTransportFactory.TryParseIPAddress(discoveryUri.Host, out ipaddress))
- {
- ipaddress = TcpTransportFactory.GetIPAddress(discoveryUri.Host, AddressFamily.InterNetwork);
- if(null == ipaddress)
- {
- throw new NMSConnectionException("Invalid host address.");
- }
- }
-
- multicastSocket.SetSocketOption(SocketOptionLevel.IP, SocketOptionName.AddMembership,
- new MulticastOption(ipaddress, IPAddress.Any));
-#if !NETCF
- multicastSocket.ReceiveTimeout = SOCKET_TIMEOUT_MILLISECONDS;
-#endif
- hasSucceeded = true;
- }
- catch(SocketException)
- {
- }
-
- return hasSucceeded;
- }
-
- private void worker_DoWork()
- {
- Thread.CurrentThread.Name = "Discovery Agent Thread.";
- byte[] buffer = new byte[BUFF_SIZE];
- string receivedInfoRaw;
- string receivedInfo;
-
- while(isStarted)
- {
- try
- {
- int numBytes = multicastSocket.Receive(buffer);
- receivedInfoRaw = System.Text.Encoding.UTF8.GetString(buffer, 0, numBytes);
- // We have to remove all of the null bytes if there are any otherwise we just
- // take the whole string as is.
- if (receivedInfoRaw.IndexOf("\0") != -1)
- {
- receivedInfo = receivedInfoRaw.Substring(0, receivedInfoRaw.IndexOf("\0"));
- }
- else
- {
- receivedInfo = receivedInfoRaw;
- }
-
- ProcessBrokerMessage(receivedInfo);
- }
- catch(SocketException)
- {
- // There was no multicast message sent before the timeout expired...Let us try again.
- }
-
- //We need to clear the buffer.
- buffer[0] = 0x0;
- ExpireOldServices();
- }
- }
-
- private void ProcessBrokerMessage(string message)
- {
- string payload;
- string brokerName;
- string serviceName;
-
- if(message.StartsWith(MulticastType))
- {
- payload = message.Substring(MulticastType.Length);
- brokerName = GetBrokerName(payload);
- serviceName = GetServiceName(payload);
-
- if(payload.StartsWith(ALIVE))
- {
- ProcessAliveBrokerMessage(brokerName, serviceName);
- }
- else if(payload.StartsWith(DEAD))
- {
- ProcessDeadBrokerMessage(brokerName, serviceName);
- }
- else
- {
- //Malformed Payload
- }
- }
- }
-
- private void ProcessDeadBrokerMessage(string brokerName, string serviceName)
- {
- if(remoteBrokers.ContainsKey(serviceName))
- {
- remoteBrokers.Remove(serviceName);
- if(onServiceRemoved != null)
- {
- onServiceRemoved(brokerName, serviceName);
- }
- }
- }
-
- private void ProcessAliveBrokerMessage(string brokerName, string serviceName)
- {
- if(remoteBrokers.ContainsKey(serviceName))
- {
- remoteBrokers[serviceName].UpdateHeartBeat();
- }
- else
- {
- remoteBrokers.Add(serviceName, new RemoteBrokerData(brokerName, serviceName));
-
- if(onNewServiceFound != null)
- {
- onNewServiceFound(brokerName, serviceName);
- }
- }
- }
-
- private static string GetBrokerName(string payload)
- {
- string[] results = payload.Split(DELIMITER);
- return results[1];
- }
-
- private static string GetServiceName(string payload)
- {
- string[] results = payload.Split(DELIMITER);
- return results[2];
- }
-
- private void ExpireOldServices()
- {
- DateTime expireTime;
- List<RemoteBrokerData> deadServices = new List<RemoteBrokerData>();
-
- foreach(KeyValuePair<string, RemoteBrokerData> brokerService in remoteBrokers)
- {
- expireTime = brokerService.Value.lastHeartBeat.AddSeconds(EXPIRATION_OFFSET_IN_SECONDS);
- if(DateTime.Now > expireTime)
- {
- deadServices.Add(brokerService.Value);
- }
- }
-
- // Remove all of the dead services
- for(int i = 0; i < deadServices.Count; i++)
- {
- ProcessDeadBrokerMessage(deadServices[i].brokerName, deadServices[i].serviceName);
- }
- }
-
- #region Properties
-
- /// <summary>
- /// This property indicates whether or not async send is enabled.
- /// </summary>
- public Uri DiscoveryURI
- {
- get { return discoveryUri; }
- set { discoveryUri = value; }
- }
-
- public bool IsStarted
- {
- get { return isStarted; }
- }
-
- public string Group
- {
- get { return group; }
- set { group = value; }
- }
-
- #endregion
-
- internal string MulticastType
- {
- get { return group + "." + TYPE_SUFFIX; }
- }
-
- internal event NewBrokerServiceFound OnNewServiceFound
- {
- add { onNewServiceFound += value; }
- remove { onNewServiceFound -= value; }
- }
-
- internal event BrokerServiceRemoved OnServiceRemoved
- {
- add { onServiceRemoved += value; }
- remove { onServiceRemoved += value; }
- }
-
- public void Dispose()
- {
- if(isStarted)
- {
- Stop();
- }
-
- multicastSocket.Shutdown(SocketShutdown.Both);
- multicastSocket = null;
- }
-
- internal class RemoteBrokerData
- {
- internal string brokerName;
- internal string serviceName;
- internal DateTime lastHeartBeat;
-
- internal RemoteBrokerData(string brokerName, string serviceName)
- {
- this.brokerName = brokerName;
- this.serviceName = serviceName;
- this.lastHeartBeat = DateTime.Now;
- }
-
- internal void UpdateHeartBeat()
- {
- this.lastHeartBeat = DateTime.Now;
- }
- }
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Net;
+using System.Net.Sockets;
+using System.Threading;
+using Apache.NMS.ActiveMQ.Transport.Tcp;
+
+namespace Apache.NMS.ActiveMQ.Transport.Discovery.Multicast
+{
+ internal delegate void NewBrokerServiceFound(string brokerName, string serviceName);
+ internal delegate void BrokerServiceRemoved(string brokerName, string serviceName);
+
+ internal class MulticastDiscoveryAgent : IDisposable
+ {
+ public const int MAX_SOCKET_CONNECTION_RETRY_ATTEMPS = 3;
+ public const int DEFAULT_BACKOFF_MILLISECONDS = 100;
+ public const int BACKOFF_MULTIPLIER = 2;
+ public const string DEFAULT_DISCOVERY_URI_STRING = "multicast://239.255.2.3:6155";
+ public const string DEFAULT_HOST_STR = "default";
+ public const string DEFAULT_HOST_IP = "239.255.2.3";
+ public const int DEFAULT_PORT = 6155;
+
+ private const string TYPE_SUFFIX = "ActiveMQ-4.";
+ private const string ALIVE = "alive";
+ private const string DEAD = "dead";
+ private const char DELIMITER = '%';
+ private const int BUFF_SIZE = 8192;
+ private const string DEFAULT_GROUP = "default";
+ private const int EXPIRATION_OFFSET_IN_SECONDS = 2;
+ private const int WORKER_KILL_TIME_SECONDS = 10;
+ private const int SOCKET_TIMEOUT_MILLISECONDS = 500;
+
+ private string group;
+ private readonly object stopstartSemaphore = new object();
+ private bool isStarted = false;
+ private Uri discoveryUri;
+ private Socket multicastSocket;
+ private IPEndPoint endPoint;
+ private Thread worker;
+
+ private event NewBrokerServiceFound onNewServiceFound;
+ private event BrokerServiceRemoved onServiceRemoved;
+
+ /// <summary>
+ /// Indexed by service name
+ /// </summary>
+ private readonly Dictionary<string, RemoteBrokerData> remoteBrokers;
+
+ public MulticastDiscoveryAgent()
+ {
+ group = DEFAULT_GROUP;
+ remoteBrokers = new Dictionary<string, RemoteBrokerData>();
+ }
+
+ public void Start()
+ {
+ lock(stopstartSemaphore)
+ {
+ if (discoveryUri == null || discoveryUri.Host.Equals(DEFAULT_HOST_STR))
+ {
+ discoveryUri = new Uri(DEFAULT_DISCOVERY_URI_STRING);
+ }
+
+ if(multicastSocket == null)
+ {
+ int numFailedAttempts = 0;
+ int backoffTime = DEFAULT_BACKOFF_MILLISECONDS;
+
+ Tracer.Info("Connecting to multicast discovery socket.");
+ while(!TryToConnectSocket())
+ {
+ numFailedAttempts++;
+ if(numFailedAttempts > MAX_SOCKET_CONNECTION_RETRY_ATTEMPS)
+ {
+ throw new ApplicationException(
+ "Could not open the socket in order to discover advertising brokers.");
+ }
+
+ Thread.Sleep(backoffTime);
+ backoffTime *= BACKOFF_MULTIPLIER;
+ }
+ }
+
+ if(worker == null)
+ {
+ Tracer.Info("Starting multicast discovery agent worker thread");
+ worker = new Thread(new ThreadStart(worker_DoWork));
+ worker.IsBackground = true;
+ worker.Start();
+ isStarted = true;
+ }
+ }
+ }
+
+ public void Stop()
+ {
+ Thread localThread = null;
+
+ lock(stopstartSemaphore)
+ {
+ Tracer.Info("Stopping multicast discovery agent worker thread");
+ localThread = worker;
+ worker = null;
+ // Changing the isStarted flag will signal the thread that it needs to shut down.
+ isStarted = false;
+ }
+
+ if(localThread != null)
+ {
+ // wait for the worker to stop.
+ if(!localThread.Join(WORKER_KILL_TIME_SECONDS))
+ {
+ Tracer.Info("!! Timeout waiting for multicast discovery agent localThread to stop");
+ localThread.Abort();
+ }
+ }
+
+ Tracer.Info("Multicast discovery agent worker thread joined");
+ }
+
+ private bool TryToConnectSocket()
+ {
+ bool hasSucceeded = false;
+
+ try
+ {
+ multicastSocket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);
+ endPoint = new IPEndPoint(IPAddress.Any, discoveryUri.Port);
+
+ //We have to allow reuse in the multicast socket. Otherwise, we would be unable to use multiple clients on the same machine.
+ multicastSocket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, 1);
+ multicastSocket.Bind(endPoint);
+
+ IPAddress ipaddress;
+
+ if(!TcpTransportFactory.TryParseIPAddress(discoveryUri.Host, out ipaddress))
+ {
+ ipaddress = TcpTransportFactory.GetIPAddress(discoveryUri.Host, AddressFamily.InterNetwork);
+ if(null == ipaddress)
+ {
+ throw new NMSConnectionException("Invalid host address.");
+ }
+ }
+
+ multicastSocket.SetSocketOption(SocketOptionLevel.IP, SocketOptionName.AddMembership,
+ new MulticastOption(ipaddress, IPAddress.Any));
+#if !NETCF
+ multicastSocket.ReceiveTimeout = SOCKET_TIMEOUT_MILLISECONDS;
+#endif
+ hasSucceeded = true;
+ }
+ catch(SocketException)
+ {
+ }
+
+ return hasSucceeded;
+ }
+
+ private void worker_DoWork()
+ {
+ Thread.CurrentThread.Name = "Discovery Agent Thread.";
+ byte[] buffer = new byte[BUFF_SIZE];
+ string receivedInfoRaw;
+ string receivedInfo;
+
+ while(isStarted)
+ {
+ try
+ {
+ int numBytes = multicastSocket.Receive(buffer);
+ receivedInfoRaw = System.Text.Encoding.UTF8.GetString(buffer, 0, numBytes);
+ // We have to remove all of the null bytes if there are any otherwise we just
+ // take the whole string as is.
+ if (receivedInfoRaw.IndexOf("\0") != -1)
+ {
+ receivedInfo = receivedInfoRaw.Substring(0, receivedInfoRaw.IndexOf("\0"));
+ }
+ else
+ {
+ receivedInfo = receivedInfoRaw;
+ }
+
+ ProcessBrokerMessage(receivedInfo);
+ }
+ catch(SocketException)
+ {
+ // There was no multicast message sent before the timeout expired...Let us try again.
+ }
+
+ //We need to clear the buffer.
+ buffer[0] = 0x0;
+ ExpireOldServices();
+ }
+ }
+
+ private void ProcessBrokerMessage(string message)
+ {
+ string payload;
+ string brokerName;
+ string serviceName;
+
+ if(message.StartsWith(MulticastType))
+ {
+ payload = message.Substring(MulticastType.Length);
+ brokerName = GetBrokerName(payload);
+ serviceName = GetServiceName(payload);
+
+ if(payload.StartsWith(ALIVE))
+ {
+ ProcessAliveBrokerMessage(brokerName, serviceName);
+ }
+ else if(payload.StartsWith(DEAD))
+ {
+ ProcessDeadBrokerMessage(brokerName, serviceName);
+ }
+ else
+ {
+ //Malformed Payload
+ }
+ }
+ }
+
+ private void ProcessDeadBrokerMessage(string brokerName, string serviceName)
+ {
+ if(remoteBrokers.ContainsKey(serviceName))
+ {
+ remoteBrokers.Remove(serviceName);
+ if(onServiceRemoved != null)
+ {
+ onServiceRemoved(brokerName, serviceName);
+ }
+ }
+ }
+
+ private void ProcessAliveBrokerMessage(string brokerName, string serviceName)
+ {
+ if(remoteBrokers.ContainsKey(serviceName))
+ {
+ remoteBrokers[serviceName].UpdateHeartBeat();
+ }
+ else
+ {
+ remoteBrokers.Add(serviceName, new RemoteBrokerData(brokerName, serviceName));
+
+ if(onNewServiceFound != null)
+ {
+ onNewServiceFound(brokerName, serviceName);
+ }
+ }
+ }
+
+ private static string GetBrokerName(string payload)
+ {
+ string[] results = payload.Split(DELIMITER);
+ return results[1];
+ }
+
+ private static string GetServiceName(string payload)
+ {
+ string[] results = payload.Split(DELIMITER);
+ return results[2];
+ }
+
+ private void ExpireOldServices()
+ {
+ DateTime expireTime;
+ List<RemoteBrokerData> deadServices = new List<RemoteBrokerData>();
+
+ foreach(KeyValuePair<string, RemoteBrokerData> brokerService in remoteBrokers)
+ {
+ expireTime = brokerService.Value.lastHeartBeat.AddSeconds(EXPIRATION_OFFSET_IN_SECONDS);
+ if(DateTime.Now > expireTime)
+ {
+ deadServices.Add(brokerService.Value);
+ }
+ }
+
+ // Remove all of the dead services
+ for(int i = 0; i < deadServices.Count; i++)
+ {
+ ProcessDeadBrokerMessage(deadServices[i].brokerName, deadServices[i].serviceName);
+ }
+ }
+
+ #region Properties
+
+ /// <summary>
+ /// This property indicates whether or not async send is enabled.
+ /// </summary>
+ public Uri DiscoveryURI
+ {
+ get { return discoveryUri; }
+ set { discoveryUri = value; }
+ }
+
+ public bool IsStarted
+ {
+ get { return isStarted; }
+ }
+
+ public string Group
+ {
+ get { return group; }
+ set { group = value; }
+ }
+
+ #endregion
+
+ internal string MulticastType
+ {
+ get { return group + "." + TYPE_SUFFIX; }
+ }
+
+ internal event NewBrokerServiceFound OnNewServiceFound
+ {
+ add { onNewServiceFound += value; }
+ remove { onNewServiceFound -= value; }
+ }
+
+ internal event BrokerServiceRemoved OnServiceRemoved
+ {
+ add { onServiceRemoved += value; }
+ remove { onServiceRemoved += value; }
+ }
+
+ public void Dispose()
+ {
+ if(isStarted)
+ {
+ Stop();
+ }
+
+ multicastSocket.Shutdown(SocketShutdown.Both);
+ multicastSocket = null;
+ }
+
+ internal class RemoteBrokerData
+ {
+ internal string brokerName;
+ internal string serviceName;
+ internal DateTime lastHeartBeat;
+
+ internal RemoteBrokerData(string brokerName, string serviceName)
+ {
+ this.brokerName = brokerName;
+ this.serviceName = serviceName;
+ this.lastHeartBeat = DateTime.Now;
+ }
+
+ internal void UpdateHeartBeat()
+ {
+ this.lastHeartBeat = DateTime.Now;
+ }
+ }
+ }
+}