You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2018/07/09 19:55:03 UTC
[3/5] activemq-nms-amqp git commit: AMQNET-576 Remove deprecated AMQP
client impl from master
http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/9122762b/src/main/csharp/MessageConsumer.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/MessageConsumer.cs b/src/main/csharp/MessageConsumer.cs
deleted file mode 100644
index e45ca3b..0000000
--- a/src/main/csharp/MessageConsumer.cs
+++ /dev/null
@@ -1,316 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-using System;
-using System.Threading;
-using Apache.NMS.Util;
-using Org.Apache.Qpid.Messaging;
-
-namespace Apache.NMS.Amqp
-{
- /// <summary>
- /// An object capable of receiving messages from some destination
- /// </summary>
- public class MessageConsumer : IMessageConsumer
- {
- /// <summary>
- /// Private object used for synchronization, instead of public "this"
- /// </summary>
- private readonly object myLock = new object();
-
- protected TimeSpan zeroTimeout = new TimeSpan(0);
-
- private readonly Session session;
- private readonly int id;
- private readonly Destination destination;
- private Destination replyToDestination;
- private readonly AcknowledgementMode acknowledgementMode;
- private event MessageListener listener;
- private int listenerCount = 0;
- private Thread asyncDeliveryThread = null;
- private AutoResetEvent pause = new AutoResetEvent(false);
- private Atomic<bool> asyncDelivery = new Atomic<bool>(false);
-
- private readonly Atomic<bool> started = new Atomic<bool>(false);
- private Org.Apache.Qpid.Messaging.Receiver qpidReceiver = null;
-
- private ConsumerTransformerDelegate consumerTransformer;
- public ConsumerTransformerDelegate ConsumerTransformer
- {
- get { return this.consumerTransformer; }
- set { this.consumerTransformer = value; }
- }
-
- public MessageConsumer(Session session, int consumerId, Destination dest, AcknowledgementMode acknowledgementMode)
- {
- this.session = session;
- this.id = consumerId;
- this.destination = dest;
- this.acknowledgementMode = acknowledgementMode;
- }
-
- #region IStartable Methods
- public void Start()
- {
- // Don't try creating receiver if session not yet up
- if (!session.IsStarted)
- {
- throw new SessionClosedException();
- }
-
- if (started.CompareAndSet(false, true))
- {
- try
- {
- // Create qpid receiver
- Tracer.DebugFormat("Start Consumer Id = " + ConsumerId.ToString());
- if (qpidReceiver == null)
- {
- qpidReceiver = session.CreateQpidReceiver(destination.Address);
- // Recover replyTo address from qpid receiver and set as the
- // replyTo destination for received messages.
- Address replyTo = qpidReceiver.GetAddress();
- if (destination.IsQueue)
- {
- Queue queue = new Queue(replyTo.Name, replyTo.Subject, replyTo.Options);
- replyToDestination = (Destination)queue;
- }
- else if (destination.IsTopic)
- {
- Topic topic = new Topic(replyTo.Name, replyTo.Subject, replyTo.Options);
- replyToDestination = (Destination)topic;
- }
- }
- }
- catch (Org.Apache.Qpid.Messaging.QpidException e)
- {
- throw new NMSException("Failed to create Qpid Receiver : " + e.Message);
- }
- }
- }
-
- public bool IsStarted
- {
- get { return started.Value; }
- }
- #endregion
-
- #region IStoppable Methods
- public void Stop()
- {
- if (started.CompareAndSet(true, false))
- {
- try
- {
- Tracer.DebugFormat("Stop Consumer Id = " + ConsumerId);
- qpidReceiver.Close();
- qpidReceiver.Dispose();
- qpidReceiver = null;
- }
- catch (Org.Apache.Qpid.Messaging.QpidException e)
- {
- throw new NMSException("Failed to close consumer with Id " + ConsumerId.ToString() + " : " + e.Message);
- }
- }
- }
- #endregion
-
- public event MessageListener Listener
- {
- add
- {
- listener += value;
- listenerCount++;
- StartAsyncDelivery();
- }
-
- remove
- {
- if(listenerCount > 0)
- {
- listener -= value;
- listenerCount--;
- }
-
- if(0 == listenerCount)
- {
- StopAsyncDelivery();
- }
- }
- }
-
-
- /// <summary>
- /// Fetch a message from Qpid Receiver.
- /// Will wait FOREVER.
- /// </summary>
- /// <returns>NMS message or null if Fetch fails</returns>
- public IMessage Receive()
- {
- return ReceiveQpid(DurationConstants.FORVER);
- }
-
-
- /// <summary>
- /// Fetch a message from Qpid Receiver
- /// Will wait for given timespan before abandoning the Fetch.
- /// </summary>
- /// <param name="timeout"></param>
- /// <returns>>NMS message or null if Fetch fails or times out</returns>
- public IMessage Receive(TimeSpan timeout)
- {
- return ReceiveQpid(DefaultMessageConverter.ToQpidDuration(timeout));
- }
-
-
- /// <summary>
- /// Fetch a message from Qpid Receiver
- /// Returns from the Fetch immediately.
- /// </summary>
- /// <returns>NMS message or null if none was pending</returns>
- public IMessage ReceiveNoWait()
- {
- return ReceiveQpid(DurationConstants.IMMEDIATE);
- }
-
-
-
- private IMessage ReceiveQpid(Org.Apache.Qpid.Messaging.Duration timeout)
- {
- IMessage nmsMessage = null;
-
- Message qpidMessage = new Message();
- if (qpidReceiver.Fetch(ref qpidMessage, timeout))
- {
- nmsMessage = session.MessageConverter.ToNmsMessage(qpidMessage);
- nmsMessage.NMSReplyTo = replyToDestination;
- if (this.session.IsAutoAcknowledge)
- {
- this.session.Acknowledge();
- }
- }
- return nmsMessage;
- }
-
-
- public void Dispose()
- {
- Close();
- }
-
- public void Close()
- {
- StopAsyncDelivery();
- Stop();
- }
-
- protected virtual void StopAsyncDelivery()
- {
- if(asyncDelivery.CompareAndSet(true, false))
- {
- if(null != asyncDeliveryThread)
- {
- Tracer.Info("Stopping async delivery thread.");
- pause.Set();
- if(!asyncDeliveryThread.Join(10000))
- {
- Tracer.Info("Aborting async delivery thread.");
- asyncDeliveryThread.Abort();
- }
-
- asyncDeliveryThread = null;
- Tracer.Info("Async delivery thread stopped.");
- }
- }
- }
-
- protected virtual void StartAsyncDelivery()
- {
- if(asyncDelivery.CompareAndSet(false, true))
- {
- asyncDeliveryThread = new Thread(new ThreadStart(DispatchLoop));
- asyncDeliveryThread.Name = "Message Consumer Dispatch: " + asyncDeliveryThread.ManagedThreadId.ToString();
- asyncDeliveryThread.IsBackground = true;
- asyncDeliveryThread.Start();
- }
- }
-
- protected virtual void DispatchLoop()
- {
- Tracer.Info("Starting dispatcher thread consumer: " + this);
- while(asyncDelivery.Value)
- {
- try
- {
- IMessage message = Receive();
- if(asyncDelivery.Value && message != null)
- {
- try
- {
- listener(message);
- }
- catch(Exception e)
- {
- HandleAsyncException(e);
- }
- }
- }
- catch(ThreadAbortException ex)
- {
- Tracer.InfoFormat("Thread abort received in thread: {0} : {1}", this, ex.Message);
- break;
- }
- catch(Exception ex)
- {
- Tracer.ErrorFormat("Exception while receiving message in thread: {0} : {1}", this, ex.Message);
- }
- }
- Tracer.Info("Stopping dispatcher thread consumer: " + this);
- }
-
- protected virtual void HandleAsyncException(Exception e)
- {
- session.Connection.HandleException(e);
- }
-
- protected virtual IMessage ToNmsMessage(Message message)
- {
- if(message == null)
- {
- return null;
- }
-
- IMessage converted = session.MessageConverter.ToNmsMessage(message);
-
- if(this.ConsumerTransformer != null)
- {
- IMessage newMessage = ConsumerTransformer(this.session, this, converted);
- if(newMessage != null)
- {
- converted = newMessage;
- }
- }
-
- return converted;
- }
-
- public int ConsumerId
- {
- get { return id; }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/9122762b/src/main/csharp/MessageProducer.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/MessageProducer.cs b/src/main/csharp/MessageProducer.cs
deleted file mode 100644
index 7ac633a..0000000
--- a/src/main/csharp/MessageProducer.cs
+++ /dev/null
@@ -1,270 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-using System;
-using System.Threading;
-using Apache.NMS.Util;
-using Org.Apache.Qpid.Messaging;
-
-namespace Apache.NMS.Amqp
-{
- /// <summary>
- /// An object capable of sending messages to some destination
- /// </summary>
- public class MessageProducer : IMessageProducer
- {
- /// <summary>
- /// Private object used for synchronization, instead of public "this"
- /// </summary>
- private readonly object myLock = new object();
-
- private readonly Session session;
- private readonly int id;
- private Destination destination;
-
- //private long messageCounter;
- private MsgDeliveryMode deliveryMode;
- private TimeSpan timeToLive;
- private MsgPriority priority;
- private bool disableMessageID;
- private bool disableMessageTimestamp;
-
- //private IMessageConverter messageConverter;
-
- private readonly Atomic<bool> started = new Atomic<bool>(false);
- private Org.Apache.Qpid.Messaging.Sender qpidSender = null;
-
- private ProducerTransformerDelegate producerTransformer;
- public ProducerTransformerDelegate ProducerTransformer
- {
- get { return this.producerTransformer; }
- set { this.producerTransformer = value; }
- }
-
- public MessageProducer(Session session, int producerId, Destination destination)
- {
- this.session = session;
- this.id = producerId;
- this.destination = destination;
- }
-
- #region IStartable Methods
- public void Start()
- {
- // Don't try creating session if connection not yet up
- if (!session.IsStarted)
- {
- throw new SessionClosedException();
- }
-
- if (started.CompareAndSet(false, true))
- {
- try
- {
- // Create qpid sender
- Tracer.DebugFormat("Start Producer Id = " + ProducerId.ToString());
- if (qpidSender == null)
- {
- qpidSender = session.CreateQpidSender(destination.Address);
- }
- }
- catch (Org.Apache.Qpid.Messaging.QpidException e)
- {
- throw new NMSException("Failed to create Qpid Sender : " + e.Message);
- }
- }
- }
-
- public bool IsStarted
- {
- get { return started.Value; }
- }
- #endregion
-
- #region IStoppable Methods
- public void Stop()
- {
- if (started.CompareAndSet(true, false))
- {
- try
- {
- Tracer.DebugFormat("Stop Producer Id = " + ProducerId);
- qpidSender.Close();
- qpidSender.Dispose();
- qpidSender = null;
- }
- catch (Org.Apache.Qpid.Messaging.QpidException e)
- {
- throw new NMSException("Failed to close producer with Id " + ProducerId.ToString() + " : " + e.Message);
- }
- }
- }
- #endregion
-
- public void Send(IMessage message)
- {
- Send(Destination, message);
- }
-
- public void Send(IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive)
- {
- Send(Destination, message, deliveryMode, priority, timeToLive);
- }
-
- public void Send(IDestination destination, IMessage message)
- {
- Send(destination, message, DeliveryMode, Priority, TimeToLive);
- }
-
- public void Send(IDestination destination, IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive)
- {
- try
- {
- if (this.ProducerTransformer != null)
- {
- IMessage transformed = this.ProducerTransformer(this.session, this, message);
- if (transformed != null)
- {
- message = transformed;
- }
- }
-
- message.NMSDeliveryMode = deliveryMode;
- message.NMSTimeToLive = timeToLive;
- message.NMSPriority = priority;
- if (!DisableMessageTimestamp)
- {
- message.NMSTimestamp = DateTime.UtcNow;
- }
-
- if (!DisableMessageID)
- {
- // TODO: message.NMSMessageId =
- }
-
- // Convert the Message into a Amqp message
- Message msg = session.MessageConverter.ToAmqpMessage(message);
-
- qpidSender.Send(msg);
- }
- catch (Exception e)
- {
- throw new NMSException(e.Message + ": " /* TODO: + dest */, e);
- }
- }
-
- public void Close()
- {
- Stop();
- }
-
- public void Dispose()
- {
- Close();
- }
-
- public IMessage CreateMessage()
- {
- return session.CreateMessage();
- }
-
- public ITextMessage CreateTextMessage()
- {
- return session.CreateTextMessage();
- }
-
- public ITextMessage CreateTextMessage(String text)
- {
- return session.CreateTextMessage(text);
- }
-
- public IMapMessage CreateMapMessage()
- {
- return session.CreateMapMessage();
- }
-
- public IObjectMessage CreateObjectMessage(Object body)
- {
- return session.CreateObjectMessage(body);
- }
-
- public IBytesMessage CreateBytesMessage()
- {
- return session.CreateBytesMessage();
- }
-
- public IBytesMessage CreateBytesMessage(byte[] body)
- {
- return session.CreateBytesMessage(body);
- }
-
- public IStreamMessage CreateStreamMessage()
- {
- return session.CreateStreamMessage();
- }
-
- public MsgDeliveryMode DeliveryMode
- {
- get { return deliveryMode; }
- set { deliveryMode = value; }
- }
-
- public TimeSpan TimeToLive
- {
- get { return timeToLive; }
- set { timeToLive = value; }
- }
-
- /// <summary>
- /// The default timeout for network requests.
- /// </summary>
- public TimeSpan RequestTimeout
- {
- get { return NMSConstants.defaultRequestTimeout; }
- set { }
- }
-
- public IDestination Destination
- {
- get { return destination; }
- set { destination = (Destination) value; }
- }
-
- public MsgPriority Priority
- {
- get { return priority; }
- set { priority = value; }
- }
-
- public bool DisableMessageID
- {
- get { return disableMessageID; }
- set { disableMessageID = value; }
- }
-
- public bool DisableMessageTimestamp
- {
- get { return disableMessageTimestamp; }
- set { disableMessageTimestamp = value; }
- }
-
- public int ProducerId
- {
- get { return id; }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/9122762b/src/main/csharp/ObjectMessage.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/ObjectMessage.cs b/src/main/csharp/ObjectMessage.cs
deleted file mode 100644
index 8935d41..0000000
--- a/src/main/csharp/ObjectMessage.cs
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-using System.IO;
-
-#if !(PocketPC||NETCF||NETCF_2_0)
-using System.Runtime.Serialization;
-using System.Runtime.Serialization.Formatters.Binary;
-#endif
-
-// TODO: Any support
-
-namespace Apache.NMS.Amqp
-{
- public class ObjectMessage : BaseMessage, IObjectMessage
- {
- private object body;
-#if !(PocketPC||NETCF||NETCF_2_0)
- private IFormatter formatter;
-#endif
-
- public ObjectMessage()
- {
- }
-
- public ObjectMessage(object body)
- {
- this.body = body;
- }
-
- public object Body
- {
- get
- {
-#if !(PocketPC||NETCF||NETCF_2_0)
- if(body == null)
- {
- body = Formatter.Deserialize(new MemoryStream(Content));
- }
-#else
-#endif
- return body;
- }
-
- set
- {
-#if !(PocketPC||NETCF||NETCF_2_0)
- body = value;
-#else
- throw new NotImplementedException();
-#endif
- }
- }
-
-
-#if !(PocketPC||NETCF||NETCF_2_0)
- public IFormatter Formatter
- {
- get
- {
- if(formatter == null)
- {
- formatter = new BinaryFormatter();
- }
- return formatter;
- }
-
- set
- {
- formatter = value;
- }
- }
-
-#endif
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/9122762b/src/main/csharp/Queue.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Queue.cs b/src/main/csharp/Queue.cs
deleted file mode 100644
index 463f341..0000000
--- a/src/main/csharp/Queue.cs
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System;
-
-// Typedef for options map
-using OptionsMap = System.Collections.Generic.Dictionary<System.String, System.Object>;
-
-namespace Apache.NMS.Amqp
-{
-
- /// <summary>
- /// Summary description for Queue.
- /// </summary>
- public class Queue : Destination, IQueue
- {
-
- public Queue()
- : base()
- {
- }
-
- public Queue(String name)
- : base(name)
- {
- }
-
- public Queue(String name, string subject, OptionsMap options)
- : base(name, subject, options, "queue")
- {
- }
-
- override public DestinationType DestinationType
- {
- get
- {
- return DestinationType.Queue;
- }
- }
-
- public String QueueName
- {
- get { return Path; }
- }
-
-
- public override Destination CreateDestination(String name)
- {
- return new Queue(name);
- }
-
- public override Destination CreateDestination(String name, string subject, OptionsMap options)
- {
- return new Queue(name, subject, options);
- }
- }
-}
-
http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/9122762b/src/main/csharp/Session.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Session.cs b/src/main/csharp/Session.cs
deleted file mode 100644
index 736b4ef..0000000
--- a/src/main/csharp/Session.cs
+++ /dev/null
@@ -1,659 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System;
-using System.Collections;
-using System.Threading;
-using Apache.NMS.Util;
-using Org.Apache.Qpid.Messaging;
-
-// Typedef for options map
-using OptionsMap = System.Collections.Generic.Dictionary<System.String, System.Object>;
-
-namespace Apache.NMS.Amqp
-{
- /// <summary>
- /// Amqp provider of ISession
- /// </summary>
- public class Session : ISession, IStartable, IStoppable
- {
- /// <summary>
- /// Private object used for synchronization, instead of public "this"
- /// </summary>
- private readonly object myLock = new object();
-
- private readonly IDictionary consumers = Hashtable.Synchronized(new Hashtable());
- private readonly IDictionary producers = Hashtable.Synchronized(new Hashtable());
-
- private Connection connection;
- private AcknowledgementMode acknowledgementMode;
- private IMessageConverter messageConverter;
- private readonly int id;
-
- private int consumerCounter;
- private int producerCounter;
- private long nextDeliveryId;
- private long lastDeliveredSequenceId;
- private readonly object sessionLock = new object();
- private readonly Atomic<bool> started = new Atomic<bool>(false);
- protected bool disposed = false;
- protected bool closed = false;
- protected bool closing = false;
- private TimeSpan disposeStopTimeout = TimeSpan.FromMilliseconds(30000);
- private TimeSpan closeStopTimeout = TimeSpan.FromMilliseconds(Timeout.Infinite);
- private TimeSpan requestTimeout;
-
- private Org.Apache.Qpid.Messaging.Session qpidSession = null; // Don't create until Start()
-
- public Session(Connection connection, int sessionId, AcknowledgementMode acknowledgementMode)
- {
- this.connection = connection;
- this.acknowledgementMode = acknowledgementMode;
- MessageConverter = connection.MessageConverter;
- id = sessionId;
- if (this.acknowledgementMode == AcknowledgementMode.Transactional)
- {
- // TODO: transactions
- throw new NotSupportedException("Transactions are not supported by Qpid/Amqp");
- }
- else if (acknowledgementMode == AcknowledgementMode.DupsOkAcknowledge)
- {
- this.acknowledgementMode = AcknowledgementMode.AutoAcknowledge;
- }
- if (connection.IsStarted)
- {
- this.Start();
- }
- connection.AddSession(this);
- }
-
- public AcknowledgementMode AcknowledgementMode
- {
- get { return this.acknowledgementMode; }
- }
-
- public bool IsClientAcknowledge
- {
- get { return this.acknowledgementMode == AcknowledgementMode.ClientAcknowledge; }
- }
-
- public bool IsAutoAcknowledge
- {
- get { return this.acknowledgementMode == AcknowledgementMode.AutoAcknowledge; }
- }
-
- public bool IsDupsOkAcknowledge
- {
- get { return this.acknowledgementMode == AcknowledgementMode.DupsOkAcknowledge; }
- }
-
- public bool IsIndividualAcknowledge
- {
- get { return this.acknowledgementMode == AcknowledgementMode.IndividualAcknowledge; }
- }
-
- public bool IsTransacted
- {
- get { return this.acknowledgementMode == AcknowledgementMode.Transactional; }
- }
-
- #region IStartable Methods
- /// <summary>
- /// Create new unmanaged session and start senders and receivers
- /// Associated connection must be open.
- /// </summary>
- public void Start()
- {
- // Don't try creating session if connection not yet up
- if (!connection.IsStarted)
- {
- throw new ConnectionClosedException();
- }
-
- if (started.CompareAndSet(false, true))
- {
- try
- {
- // Create qpid session
- if (qpidSession == null)
- {
- qpidSession = connection.CreateQpidSession();
- }
-
- // Start producers and consumers
- lock (producers.SyncRoot)
- {
- foreach (MessageProducer producer in producers.Values)
- {
- producer.Start();
- }
- }
- lock (consumers.SyncRoot)
- {
- foreach (MessageConsumer consumer in consumers.Values)
- {
- consumer.Start();
- }
- }
- }
- catch (Org.Apache.Qpid.Messaging.QpidException e)
- {
- throw new SessionClosedException( "Failed to create session : " + e.Message );
- }
- }
- }
-
- public bool IsStarted
- {
- get { return started.Value; }
- }
- #endregion
-
- #region IStoppable Methods
- public void Stop()
- {
- if (started.CompareAndSet(true, false))
- {
- try
- {
- lock (producers.SyncRoot)
- {
- foreach (MessageProducer producer in producers.Values)
- {
- producer.Stop();
- }
- }
- lock (consumers.SyncRoot)
- {
- foreach (MessageConsumer consumer in consumers.Values)
- {
- consumer.Stop();
- }
- }
-
- qpidSession.Dispose();
- qpidSession = null;
- }
- catch (Org.Apache.Qpid.Messaging.QpidException e)
- {
- throw new NMSException("Failed to close session with Id " + SessionId.ToString() + " : " + e.Message);
- }
- }
- }
- #endregion
-
- #region IDisposable Methods
- public void Dispose()
- {
- Dispose(true);
- }
- #endregion
-
-
- protected void Dispose(bool disposing)
- {
- if (this.disposed)
- {
- return;
- }
-
- try
- {
- // Force a Stop when we are Disposing vs a Normal Close.
- Close();
- }
- catch
- {
- // Ignore network errors.
- }
-
- this.disposed = true;
- }
-
- public virtual void Close()
- {
- if (!this.closed)
- {
- try
- {
- Tracer.InfoFormat("Closing The Session with Id {0}", SessionId);
- DoClose();
- Tracer.InfoFormat("Closed The Session with Id {0}", SessionId);
- }
- catch (Exception ex)
- {
- Tracer.ErrorFormat("Error closing Session with id {0} : {1}", SessionId, ex);
- }
- }
- }
-
- internal void DoClose()
- {
- Shutdown();
- }
-
- internal void Shutdown()
- {
- //Tracer.InfoFormat("Executing Shutdown on Session with Id {0}", this.info.SessionId);
-
- if (this.closed)
- {
- return;
- }
-
- lock (myLock)
- {
- if (this.closed || this.closing)
- {
- return;
- }
-
- try
- {
- this.closing = true;
-
- // Stop all message deliveries from this Session
- lock (consumers.SyncRoot)
- {
- foreach (MessageConsumer consumer in consumers.Values)
- {
- consumer.Close();
- }
- }
- consumers.Clear();
-
- lock (producers.SyncRoot)
- {
- foreach (MessageProducer producer in producers.Values)
- {
- producer.Close();
- }
- }
- producers.Clear();
-
- Connection.RemoveSession(this);
- }
- catch (Exception ex)
- {
- Tracer.ErrorFormat("Error closing Session with Id {0} : {1}", SessionId, ex);
- }
- finally
- {
- this.closed = true;
- this.closing = false;
- }
- }
- }
-
- public IMessageProducer CreateProducer()
- {
- return CreateProducer(null);
- }
-
- public IMessageProducer CreateProducer(IDestination destination)
- {
- if (destination == null)
- {
- throw new InvalidDestinationException("Cannot create a Consumer with a Null destination");
- }
- MessageProducer producer = null;
- try
- {
- Queue queue = new Queue(destination.ToString());
- producer = DoCreateMessageProducer(queue);
-
- this.AddProducer(producer);
- }
- catch (Exception)
- {
- if (producer != null)
- {
- this.RemoveProducer(producer.ProducerId);
- producer.Close();
- }
-
- throw;
- }
-
- return producer;
- }
-
- internal virtual MessageProducer DoCreateMessageProducer(Destination destination)
- {
- return new MessageProducer(this, GetNextProducerId(), destination);
- }
-
- public IMessageConsumer CreateConsumer(IDestination destination)
- {
- return CreateConsumer(destination, null, false);
- }
-
- public IMessageConsumer CreateConsumer(IDestination destination, string selector)
- {
- return CreateConsumer(destination, selector, false);
- }
-
- public IMessageConsumer CreateConsumer(IDestination destination, string selector, bool noLocal)
- {
- if (destination == null)
- {
- throw new InvalidDestinationException("Cannot create a Consumer with a Null destination");
- }
-
- MessageConsumer consumer = null;
-
- try
- {
- Queue queue = new Queue(destination.ToString());
- consumer = DoCreateMessageConsumer(GetNextConsumerId(), queue, acknowledgementMode);
-
- consumer.ConsumerTransformer = this.ConsumerTransformer;
-
- this.AddConsumer(consumer);
-
- if (this.Connection.IsStarted)
- {
- consumer.Start();
- }
- }
- catch (Exception)
- {
- if (consumer != null)
- {
- this.RemoveConsumer(consumer);
- consumer.Close();
- }
-
- throw;
- }
-
- return consumer;
- }
-
-
- public IMessageConsumer CreateDurableConsumer(ITopic destination, string name, string selector, bool noLocal)
- {
- throw new NotSupportedException("TODO: Durable Consumer");
- }
-
- internal virtual MessageConsumer DoCreateMessageConsumer(int id, Destination destination, AcknowledgementMode mode)
- {
- return new MessageConsumer(this, id, destination, mode);
- }
-
- public void DeleteDurableConsumer(string name)
- {
- throw new NotSupportedException("TODO: Durable Consumer");
- }
-
- public IQueueBrowser CreateBrowser(IQueue queue)
- {
- throw new NotImplementedException();
- }
-
- public IQueueBrowser CreateBrowser(IQueue queue, string selector)
- {
- throw new NotImplementedException();
- }
-
- public IQueue GetQueue(string name)
- {
- return new Queue(name);
- }
-
- public ITopic GetTopic(string name)
- {
- return new Topic(name);
- }
-
- public IQueue GetQueue(string name, string subject, OptionsMap options)
- {
- return new Queue(name, subject, options);
- }
-
- public ITopic GetTopic(string name, string subject, OptionsMap options)
- {
- return new Topic(name, subject, options);
- }
-
- public ITemporaryQueue CreateTemporaryQueue()
- {
- throw new NotSupportedException("TODO: Temp queue");
- }
-
- public ITemporaryTopic CreateTemporaryTopic()
- {
- throw new NotSupportedException("TODO: Temp topic");
- }
-
- /// <summary>
- /// Delete a destination (Queue, Topic, Temp Queue, Temp Topic).
- /// </summary>
- public void DeleteDestination(IDestination destination)
- {
- // TODO: Implement if possible. If not possible, then change exception to NotSupportedException().
- throw new NotImplementedException();
- }
-
- public IMessage CreateMessage()
- {
- BaseMessage answer = new BaseMessage();
- return answer;
- }
-
-
- public ITextMessage CreateTextMessage()
- {
- TextMessage answer = new TextMessage();
- return answer;
- }
-
- public ITextMessage CreateTextMessage(string text)
- {
- TextMessage answer = new TextMessage(text);
- return answer;
- }
-
- public IMapMessage CreateMapMessage()
- {
- return new MapMessage();
- }
-
- public IBytesMessage CreateBytesMessage()
- {
- return new BytesMessage();
- }
-
- public IBytesMessage CreateBytesMessage(byte[] body)
- {
- BytesMessage answer = new BytesMessage();
- answer.Content = body;
- return answer;
- }
-
- public IStreamMessage CreateStreamMessage()
- {
- return new StreamMessage();
- }
-
- public IObjectMessage CreateObjectMessage(Object body)
- {
- ObjectMessage answer = new ObjectMessage();
- answer.Body = body;
- return answer;
- }
-
- public void Commit()
- {
- throw new NotSupportedException("Transactions not supported by Qpid/Amqp");
- }
-
- public void Rollback()
- {
- throw new NotSupportedException("Transactions not supported by Qpid/Amqp");
- }
-
- public void Recover()
- {
- throw new NotSupportedException("Transactions not supported by Qpid/Amqp");
- }
-
- // Properties
- public Connection Connection
- {
- get { return connection; }
- }
-
- /// <summary>
- /// The default timeout for network requests.
- /// </summary>
- public TimeSpan RequestTimeout
- {
- get { return NMSConstants.defaultRequestTimeout; }
- set { }
- }
-
- public IMessageConverter MessageConverter
- {
- get { return messageConverter; }
- set { messageConverter = value; }
- }
-
- public bool Transacted
- {
- get { return acknowledgementMode == AcknowledgementMode.Transactional; }
- }
-
- private ConsumerTransformerDelegate consumerTransformer;
- public ConsumerTransformerDelegate ConsumerTransformer
- {
- get { return this.consumerTransformer; }
- set { this.consumerTransformer = value; }
- }
-
- private ProducerTransformerDelegate producerTransformer;
- public ProducerTransformerDelegate ProducerTransformer
- {
- get { return this.producerTransformer; }
- set { this.producerTransformer = value; }
- }
-
- public void AddConsumer(MessageConsumer consumer)
- {
- if (!this.closing)
- {
- // Registered with Connection before we register at the broker.
- consumers[consumer.ConsumerId] = consumer;
- }
- }
-
- public void RemoveConsumer(MessageConsumer consumer)
- {
- if (!this.closing)
- {
- consumers.Remove(consumer.ConsumerId);
- }
- }
-
- public void AddProducer(MessageProducer producer)
- {
- if (!this.closing)
- {
- this.producers[producer.ProducerId] = producer;
- }
- }
-
- public void RemoveProducer(int objectId)
- {
- if (!this.closing)
- {
- producers.Remove(objectId);
- }
- }
-
- public int GetNextConsumerId()
- {
- return Interlocked.Increment(ref consumerCounter);
- }
-
- public int GetNextProducerId()
- {
- return Interlocked.Increment(ref producerCounter);
- }
-
- public int SessionId
- {
- get { return id; }
- }
-
-
- public Org.Apache.Qpid.Messaging.Receiver CreateQpidReceiver(Address address)
- {
- if (!IsStarted)
- {
- throw new SessionClosedException();
- }
- return qpidSession.CreateReceiver(address);
- }
-
- public Org.Apache.Qpid.Messaging.Sender CreateQpidSender(Address address)
- {
- if (!IsStarted)
- {
- throw new SessionClosedException();
- }
- return qpidSession.CreateSender(address);
- }
-
- //
- // Acknowledges all outstanding messages that have been received
- // by the application on this session.
- //
- // @param sync if true, blocks until the acknowledgement has been
- // processed by the server
- //
- public void Acknowledge()
- {
- qpidSession.Acknowledge(false);
- }
-
- public void Acknowledge(bool sync)
- {
- qpidSession.Acknowledge(sync);
- }
-
- //
- // These flavors of acknowledge are available in the qpid messaging
- // interface but not exposed to the NMS message/session stack.
- //
- // Acknowledges the specified message.
- //
- // void acknowledge(Message&, bool sync=false);
- //
- // Acknowledges all message up to the specified message.
- //
- // void acknowledgeUpTo(Message&, bool sync=false);
-
- #region Transaction State Events
-
- public event SessionTxEventDelegate TransactionStartedListener;
- public event SessionTxEventDelegate TransactionCommittedListener;
- public event SessionTxEventDelegate TransactionRolledBackListener;
-
- #endregion
-
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/9122762b/src/main/csharp/SessionClosedException.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/SessionClosedException.cs b/src/main/csharp/SessionClosedException.cs
deleted file mode 100644
index 6864b50..0000000
--- a/src/main/csharp/SessionClosedException.cs
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-using System;
-
-namespace Apache.NMS.Amqp
-{
- /// <summary>
- /// Exception thrown when a session is used that it already closed
- /// </summary>
- [Serializable]
- public class SessionClosedException : NMSException
- {
- public SessionClosedException()
- : base("The session is already closed!")
- {
- }
-
- public SessionClosedException(string message)
- : base(message)
- {
- }
-
- public SessionClosedException(string message, string errorCode)
- : base(message, errorCode)
- {
- }
-
- public SessionClosedException(string message, Exception innerException)
- : base(message, innerException)
- {
- }
-
- public SessionClosedException(string message, string errorCode, Exception innerException)
- : base(message, errorCode, innerException)
- {
- }
-
- #region ISerializable interface implementation
-
- /// <summary>
- /// Initializes a new instance of the SessionClosedException class with serialized data.
- /// Throws System.ArgumentNullException if the info parameter is null.
- /// Throws System.Runtime.Serialization.SerializationException if the class name is null or System.Exception.HResult is zero (0).
- /// </summary>
- /// <param name="info">The SerializationInfo that holds the serialized object data about the exception being thrown.</param>
- /// <param name="context">The StreamingContext that contains contextual information about the source or destination.</param>
- protected SessionClosedException(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context)
- : base(info, context)
- {
- }
-
- #endregion
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/9122762b/src/main/csharp/StreamMessage.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/StreamMessage.cs b/src/main/csharp/StreamMessage.cs
deleted file mode 100644
index 7e82845..0000000
--- a/src/main/csharp/StreamMessage.cs
+++ /dev/null
@@ -1,895 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-using System;
-using System.IO;
-using Apache.NMS.Util;
-
-// TODO: Any Amqp support?
-
-namespace Apache.NMS.Amqp
-{
- public class StreamMessage : BaseMessage, IStreamMessage
- {
- private EndianBinaryReader dataIn = null;
- private EndianBinaryWriter dataOut = null;
- private MemoryStream byteBuffer = null;
- private int bytesRemaining = -1;
-
- public bool ReadBoolean()
- {
- InitializeReading();
-
- try
- {
- long startingPos = this.byteBuffer.Position;
- try
- {
- int type = this.dataIn.ReadByte();
-
- if(type == PrimitiveMap.BOOLEAN_TYPE)
- {
- return this.dataIn.ReadBoolean();
- }
- else if(type == PrimitiveMap.STRING_TYPE)
- {
- return Boolean.Parse(this.dataIn.ReadString16());
- }
- else if(type == PrimitiveMap.NULL)
- {
- this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
- throw new NMSException("Cannot convert Null type to a bool");
- }
- else
- {
- this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
- throw new MessageFormatException("Value is not a Boolean type.");
- }
- }
- catch(FormatException e)
- {
- this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
- throw NMSExceptionSupport.CreateMessageFormatException(e);
- }
- }
- catch(EndOfStreamException e)
- {
- throw NMSExceptionSupport.CreateMessageEOFException(e);
- }
- catch(IOException e)
- {
- throw NMSExceptionSupport.CreateMessageFormatException(e);
- }
- }
-
- public byte ReadByte()
- {
- InitializeReading();
-
- try
- {
- long startingPos = this.byteBuffer.Position;
- try
- {
- int type = this.dataIn.ReadByte();
-
- if(type == PrimitiveMap.BYTE_TYPE)
- {
- return this.dataIn.ReadByte();
- }
- else if(type == PrimitiveMap.STRING_TYPE)
- {
- return Byte.Parse(this.dataIn.ReadString16());
- }
- else if(type == PrimitiveMap.NULL)
- {
- this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
- throw new NMSException("Cannot convert Null type to a byte");
- }
- else
- {
- this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
- throw new MessageFormatException("Value is not a Byte type.");
- }
- }
- catch(FormatException e)
- {
- this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
- throw NMSExceptionSupport.CreateMessageFormatException(e);
- }
- }
- catch(EndOfStreamException e)
- {
- throw NMSExceptionSupport.CreateMessageEOFException(e);
- }
- catch(IOException e)
- {
- throw NMSExceptionSupport.CreateMessageFormatException(e);
- }
- }
-
- public char ReadChar()
- {
- InitializeReading();
-
- try
- {
- long startingPos = this.byteBuffer.Position;
- try
- {
- int type = this.dataIn.ReadByte();
-
- if(type == PrimitiveMap.CHAR_TYPE)
- {
- return this.dataIn.ReadChar();
- }
- else if(type == PrimitiveMap.NULL)
- {
- this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
- throw new NMSException("Cannot convert Null type to a char");
- }
- else
- {
- this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
- throw new MessageFormatException("Value is not a Char type.");
- }
- }
- catch(FormatException e)
- {
- this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
- throw NMSExceptionSupport.CreateMessageFormatException(e);
- }
- }
- catch(EndOfStreamException e)
- {
- throw NMSExceptionSupport.CreateMessageEOFException(e);
- }
- catch(IOException e)
- {
- throw NMSExceptionSupport.CreateMessageFormatException(e);
- }
- }
-
- public short ReadInt16()
- {
- InitializeReading();
-
- try
- {
- long startingPos = this.byteBuffer.Position;
- try
- {
- int type = this.dataIn.ReadByte();
-
- if(type == PrimitiveMap.SHORT_TYPE)
- {
- return this.dataIn.ReadInt16();
- }
- else if(type == PrimitiveMap.BYTE_TYPE)
- {
- return this.dataIn.ReadByte();
- }
- else if(type == PrimitiveMap.STRING_TYPE)
- {
- return Int16.Parse(this.dataIn.ReadString16());
- }
- else if(type == PrimitiveMap.NULL)
- {
- this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
- throw new NMSException("Cannot convert Null type to a short");
- }
- else
- {
- this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
- throw new MessageFormatException("Value is not a Int16 type.");
- }
- }
- catch(FormatException e)
- {
- this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
- throw NMSExceptionSupport.CreateMessageFormatException(e);
- }
- }
- catch(EndOfStreamException e)
- {
- throw NMSExceptionSupport.CreateMessageEOFException(e);
- }
- catch(IOException e)
- {
- throw NMSExceptionSupport.CreateMessageFormatException(e);
- }
- }
-
- public int ReadInt32()
- {
- InitializeReading();
-
- try
- {
- long startingPos = this.byteBuffer.Position;
- try
- {
- int type = this.dataIn.ReadByte();
-
- if(type == PrimitiveMap.INTEGER_TYPE)
- {
- return this.dataIn.ReadInt32();
- }
- else if(type == PrimitiveMap.SHORT_TYPE)
- {
- return this.dataIn.ReadInt16();
- }
- else if(type == PrimitiveMap.BYTE_TYPE)
- {
- return this.dataIn.ReadByte();
- }
- else if(type == PrimitiveMap.STRING_TYPE)
- {
- return Int32.Parse(this.dataIn.ReadString16());
- }
- else if(type == PrimitiveMap.NULL)
- {
- this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
- throw new NMSException("Cannot convert Null type to a int");
- }
- else
- {
- this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
- throw new MessageFormatException("Value is not a Int32 type.");
- }
- }
- catch(FormatException e)
- {
- this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
- throw NMSExceptionSupport.CreateMessageFormatException(e);
- }
- }
- catch(EndOfStreamException e)
- {
- throw NMSExceptionSupport.CreateMessageEOFException(e);
- }
- catch(IOException e)
- {
- throw NMSExceptionSupport.CreateMessageFormatException(e);
- }
- }
-
- public long ReadInt64()
- {
- InitializeReading();
-
- try
- {
- long startingPos = this.byteBuffer.Position;
- try
- {
- int type = this.dataIn.ReadByte();
-
- if(type == PrimitiveMap.LONG_TYPE)
- {
- return this.dataIn.ReadInt64();
- }
- else if(type == PrimitiveMap.INTEGER_TYPE)
- {
- return this.dataIn.ReadInt32();
- }
- else if(type == PrimitiveMap.SHORT_TYPE)
- {
- return this.dataIn.ReadInt16();
- }
- else if(type == PrimitiveMap.BYTE_TYPE)
- {
- return this.dataIn.ReadByte();
- }
- else if(type == PrimitiveMap.STRING_TYPE)
- {
- return Int64.Parse(this.dataIn.ReadString16());
- }
- else if(type == PrimitiveMap.NULL)
- {
- this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
- throw new NMSException("Cannot convert Null type to a long");
- }
- else
- {
- this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
- throw new MessageFormatException("Value is not a Int64 type.");
- }
- }
- catch(FormatException e)
- {
- this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
- throw NMSExceptionSupport.CreateMessageFormatException(e);
- }
- }
- catch(EndOfStreamException e)
- {
- throw NMSExceptionSupport.CreateMessageEOFException(e);
- }
- catch(IOException e)
- {
- throw NMSExceptionSupport.CreateMessageFormatException(e);
- }
- }
-
- public float ReadSingle()
- {
- InitializeReading();
-
- try
- {
- long startingPos = this.byteBuffer.Position;
- try
- {
- int type = this.dataIn.ReadByte();
-
- if(type == PrimitiveMap.FLOAT_TYPE)
- {
- return this.dataIn.ReadSingle();
- }
- else if(type == PrimitiveMap.STRING_TYPE)
- {
- return Single.Parse(this.dataIn.ReadString16());
- }
- else if(type == PrimitiveMap.NULL)
- {
- this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
- throw new NMSException("Cannot convert Null type to a float");
- }
- else
- {
- this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
- throw new MessageFormatException("Value is not a Single type.");
- }
- }
- catch(FormatException e)
- {
- this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
- throw NMSExceptionSupport.CreateMessageFormatException(e);
- }
- }
- catch(EndOfStreamException e)
- {
- throw NMSExceptionSupport.CreateMessageEOFException(e);
- }
- catch(IOException e)
- {
- throw NMSExceptionSupport.CreateMessageFormatException(e);
- }
- }
-
- public double ReadDouble()
- {
- InitializeReading();
-
- try
- {
- long startingPos = this.byteBuffer.Position;
- try
- {
- int type = this.dataIn.ReadByte();
-
- if(type == PrimitiveMap.DOUBLE_TYPE)
- {
- return this.dataIn.ReadDouble();
- }
- else if(type == PrimitiveMap.FLOAT_TYPE)
- {
- return this.dataIn.ReadSingle();
- }
- else if(type == PrimitiveMap.STRING_TYPE)
- {
- return Single.Parse(this.dataIn.ReadString16());
- }
- else if(type == PrimitiveMap.NULL)
- {
- this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
- throw new NMSException("Cannot convert Null type to a double");
- }
- else
- {
- this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
- throw new MessageFormatException("Value is not a Double type.");
- }
- }
- catch(FormatException e)
- {
- this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
- throw NMSExceptionSupport.CreateMessageFormatException(e);
- }
- }
- catch(EndOfStreamException e)
- {
- throw NMSExceptionSupport.CreateMessageEOFException(e);
- }
- catch(IOException e)
- {
- throw NMSExceptionSupport.CreateMessageFormatException(e);
- }
- }
-
- public string ReadString()
- {
- InitializeReading();
-
- long startingPos = this.byteBuffer.Position;
-
- try
- {
- int type = this.dataIn.ReadByte();
-
- if(type == PrimitiveMap.BIG_STRING_TYPE)
- {
- return this.dataIn.ReadString32();
- }
- else if(type == PrimitiveMap.STRING_TYPE)
- {
- return this.dataIn.ReadString16();
- }
- else if(type == PrimitiveMap.LONG_TYPE)
- {
- return this.dataIn.ReadInt64().ToString();
- }
- else if(type == PrimitiveMap.INTEGER_TYPE)
- {
- return this.dataIn.ReadInt32().ToString();
- }
- else if(type == PrimitiveMap.SHORT_TYPE)
- {
- return this.dataIn.ReadInt16().ToString();
- }
- else if(type == PrimitiveMap.FLOAT_TYPE)
- {
- return this.dataIn.ReadSingle().ToString();
- }
- else if(type == PrimitiveMap.DOUBLE_TYPE)
- {
- return this.dataIn.ReadDouble().ToString();
- }
- else if(type == PrimitiveMap.CHAR_TYPE)
- {
- return this.dataIn.ReadChar().ToString();
- }
- else if(type == PrimitiveMap.BYTE_TYPE)
- {
- return this.dataIn.ReadByte().ToString();
- }
- else if(type == PrimitiveMap.BOOLEAN_TYPE)
- {
- return this.dataIn.ReadBoolean().ToString();
- }
- else if(type == PrimitiveMap.NULL)
- {
- return null;
- }
- else
- {
- this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
- throw new MessageFormatException("Value is not a known type.");
- }
- }
- catch(FormatException e)
- {
- this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
- throw NMSExceptionSupport.CreateMessageFormatException(e);
- }
- catch(EndOfStreamException e)
- {
- throw NMSExceptionSupport.CreateMessageEOFException(e);
- }
- catch(IOException e)
- {
- throw NMSExceptionSupport.CreateMessageFormatException(e);
- }
- }
-
- public int ReadBytes(byte[] value)
- {
- InitializeReading();
-
- if(value == null)
- {
- throw new NullReferenceException("Passed Byte Array is null");
- }
-
- try
- {
- if(this.bytesRemaining == -1)
- {
- long startingPos = this.byteBuffer.Position;
- byte type = this.dataIn.ReadByte();
-
- if(type != PrimitiveMap.BYTE_ARRAY_TYPE)
- {
- this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
- throw new MessageFormatException("Not a byte array");
- }
-
- this.bytesRemaining = this.dataIn.ReadInt32();
- }
- else if(this.bytesRemaining == 0)
- {
- this.bytesRemaining = -1;
- return -1;
- }
-
- if(value.Length <= this.bytesRemaining)
- {
- // small buffer
- this.bytesRemaining -= value.Length;
- this.dataIn.Read(value, 0, value.Length);
- return value.Length;
- }
- else
- {
- // big buffer
- int rc = this.dataIn.Read(value, 0, this.bytesRemaining);
- this.bytesRemaining = 0;
- return rc;
- }
- }
- catch(EndOfStreamException ex)
- {
- throw NMSExceptionSupport.CreateMessageEOFException(ex);
- }
- catch(IOException ex)
- {
- throw NMSExceptionSupport.CreateMessageFormatException(ex);
- }
- }
-
- public Object ReadObject()
- {
- InitializeReading();
-
- long startingPos = this.byteBuffer.Position;
-
- try
- {
- int type = this.dataIn.ReadByte();
-
- if(type == PrimitiveMap.BIG_STRING_TYPE)
- {
- return this.dataIn.ReadString32();
- }
- else if(type == PrimitiveMap.STRING_TYPE)
- {
- return this.dataIn.ReadString16();
- }
- else if(type == PrimitiveMap.LONG_TYPE)
- {
- return this.dataIn.ReadInt64();
- }
- else if(type == PrimitiveMap.INTEGER_TYPE)
- {
- return this.dataIn.ReadInt32();
- }
- else if(type == PrimitiveMap.SHORT_TYPE)
- {
- return this.dataIn.ReadInt16();
- }
- else if(type == PrimitiveMap.FLOAT_TYPE)
- {
- return this.dataIn.ReadSingle();
- }
- else if(type == PrimitiveMap.DOUBLE_TYPE)
- {
- return this.dataIn.ReadDouble();
- }
- else if(type == PrimitiveMap.CHAR_TYPE)
- {
- return this.dataIn.ReadChar();
- }
- else if(type == PrimitiveMap.BYTE_TYPE)
- {
- return this.dataIn.ReadByte();
- }
- else if(type == PrimitiveMap.BOOLEAN_TYPE)
- {
- return this.dataIn.ReadBoolean();
- }
- else if(type == PrimitiveMap.BYTE_ARRAY_TYPE)
- {
- int length = this.dataIn.ReadInt32();
- byte[] data = new byte[length];
- this.dataIn.Read(data, 0, length);
- return data;
- }
- else if(type == PrimitiveMap.NULL)
- {
- return null;
- }
- else
- {
- this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
- throw new MessageFormatException("Value is not a known type.");
- }
- }
- catch(FormatException e)
- {
- this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
- throw NMSExceptionSupport.CreateMessageFormatException(e);
- }
- catch(EndOfStreamException e)
- {
- throw NMSExceptionSupport.CreateMessageEOFException(e);
- }
- catch(IOException e)
- {
- throw NMSExceptionSupport.CreateMessageFormatException(e);
- }
- }
-
- public void WriteBoolean(bool value)
- {
- InitializeWriting();
- try
- {
- this.dataOut.Write(PrimitiveMap.BOOLEAN_TYPE);
- this.dataOut.Write(value);
- }
- catch(IOException e)
- {
- NMSExceptionSupport.Create(e);
- }
- }
-
- public void WriteByte(byte value)
- {
- InitializeWriting();
- try
- {
- this.dataOut.Write(PrimitiveMap.BYTE_TYPE);
- this.dataOut.Write(value);
- }
- catch(IOException e)
- {
- NMSExceptionSupport.Create(e);
- }
- }
-
- public void WriteBytes(byte[] value)
- {
- InitializeWriting();
- this.WriteBytes(value, 0, value.Length);
- }
-
- public void WriteBytes(byte[] value, int offset, int length)
- {
- InitializeWriting();
- try
- {
- this.dataOut.Write(PrimitiveMap.BYTE_ARRAY_TYPE);
- this.dataOut.Write((int) length);
- this.dataOut.Write(value, offset, length);
- }
- catch(IOException e)
- {
- NMSExceptionSupport.Create(e);
- }
- }
-
- public void WriteChar(char value)
- {
- InitializeWriting();
- try
- {
- this.dataOut.Write(PrimitiveMap.CHAR_TYPE);
- this.dataOut.Write(value);
- }
- catch(IOException e)
- {
- NMSExceptionSupport.Create(e);
- }
- }
-
- public void WriteInt16(short value)
- {
- InitializeWriting();
- try
- {
- this.dataOut.Write(PrimitiveMap.SHORT_TYPE);
- this.dataOut.Write(value);
- }
- catch(IOException e)
- {
- NMSExceptionSupport.Create(e);
- }
- }
-
- public void WriteInt32(int value)
- {
- InitializeWriting();
- try
- {
- this.dataOut.Write(PrimitiveMap.INTEGER_TYPE);
- this.dataOut.Write(value);
- }
- catch(IOException e)
- {
- NMSExceptionSupport.Create(e);
- }
- }
-
- public void WriteInt64(long value)
- {
- InitializeWriting();
- try
- {
- this.dataOut.Write(PrimitiveMap.LONG_TYPE);
- this.dataOut.Write(value);
- }
- catch(IOException e)
- {
- NMSExceptionSupport.Create(e);
- }
- }
-
- public void WriteSingle(float value)
- {
- InitializeWriting();
- try
- {
- this.dataOut.Write(PrimitiveMap.FLOAT_TYPE);
- this.dataOut.Write(value);
- }
- catch(IOException e)
- {
- NMSExceptionSupport.Create(e);
- }
- }
-
- public void WriteDouble(double value)
- {
- InitializeWriting();
- try
- {
- this.dataOut.Write(PrimitiveMap.DOUBLE_TYPE);
- this.dataOut.Write(value);
- }
- catch(IOException e)
- {
- NMSExceptionSupport.Create(e);
- }
- }
-
- public void WriteString(string value)
- {
- InitializeWriting();
- try
- {
- if(value.Length > 8192)
- {
- this.dataOut.Write(PrimitiveMap.BIG_STRING_TYPE);
- this.dataOut.WriteString32(value);
- }
- else
- {
- this.dataOut.Write(PrimitiveMap.STRING_TYPE);
- this.dataOut.WriteString16(value);
- }
- }
- catch(IOException e)
- {
- NMSExceptionSupport.Create(e);
- }
- }
-
- public void WriteObject(Object value)
- {
- InitializeWriting();
- if(value is System.Byte)
- {
- this.WriteByte((byte) value);
- }
- else if(value is Char)
- {
- this.WriteChar((char) value);
- }
- else if(value is Boolean)
- {
- this.WriteBoolean((bool) value);
- }
- else if(value is Int16)
- {
- this.WriteInt16((short) value);
- }
- else if(value is Int32)
- {
- this.WriteInt32((int) value);
- }
- else if(value is Int64)
- {
- this.WriteInt64((long) value);
- }
- else if(value is Single)
- {
- this.WriteSingle((float) value);
- }
- else if(value is Double)
- {
- this.WriteDouble((double) value);
- }
- else if(value is byte[])
- {
- this.WriteBytes((byte[]) value);
- }
- else if(value is String)
- {
- this.WriteString((string) value);
- }
- else
- {
- throw new MessageFormatException("Cannot write non-primitive type:" + value.GetType());
- }
- }
-
- public override void ClearBody()
- {
- base.ClearBody();
- this.byteBuffer = null;
- this.dataIn = null;
- this.dataOut = null;
- this.bytesRemaining = -1;
- }
-
- public void Reset()
- {
- StoreContent();
- this.dataIn = null;
- this.dataOut = null;
- this.byteBuffer = null;
- this.bytesRemaining = -1;
- this.ReadOnlyBody = true;
- }
-
- private void InitializeReading()
- {
- FailIfWriteOnlyBody();
- if(this.dataIn == null)
- {
- // TODO - Add support for Message Compression.
- this.byteBuffer = new MemoryStream(this.Content, false);
- dataIn = new EndianBinaryReader(byteBuffer);
- }
- }
-
- private void InitializeWriting()
- {
- FailIfReadOnlyBody();
- if(this.dataOut == null)
- {
- // TODO - Add support for Message Compression.
- this.byteBuffer = new MemoryStream();
- this.dataOut = new EndianBinaryWriter(byteBuffer);
- }
- }
-
- private void StoreContent()
- {
- if(dataOut != null)
- {
- dataOut.Close();
- // TODO - Add support for Message Compression.
-
- this.Content = byteBuffer.ToArray();
- this.dataOut = null;
- this.byteBuffer = null;
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/9122762b/src/main/csharp/TextMessage.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/TextMessage.cs b/src/main/csharp/TextMessage.cs
deleted file mode 100644
index 9e60dd4..0000000
--- a/src/main/csharp/TextMessage.cs
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System;
-
-
-namespace Apache.NMS.Amqp
-{
- public class TextMessage : BaseMessage, ITextMessage
- {
- private String text;
-
- public TextMessage()
- {
- }
-
- public TextMessage(String text)
- {
- this.Text = text;
- }
-
- public override object Clone()
- {
- TextMessage tm = (TextMessage) base.Clone();
-
- tm.text = text;
- return (TextMessage)tm;
- }
-
- public override void ClearBody()
- {
- base.ClearBody();
-
- this.text = null;
- }
-
- // Properties
-
- public string Text
- {
- get
- {
- return text;
- }
-
- set
- {
- this.text = value;
- }
- }
-
- }
-}
-
http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/9122762b/src/main/csharp/Topic.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Topic.cs b/src/main/csharp/Topic.cs
deleted file mode 100644
index 62aaf38..0000000
--- a/src/main/csharp/Topic.cs
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System;
-
-// Typedef for options map
-using OptionsMap = System.Collections.Generic.Dictionary<System.String, System.Object>;
-
-namespace Apache.NMS.Amqp
-{
-
- /// <summary>
- /// Summary description for Topic.
- /// </summary>
- public class Topic : Destination, ITopic
- {
-
- public Topic()
- : base()
- {
- }
-
- public Topic(String name)
- : base(name)
- {
- }
-
- public Topic(String name, string subject, OptionsMap options)
- : base(name, subject, options, "topic")
- {
- }
-
- override public DestinationType DestinationType
- {
- get
- {
- return DestinationType.Topic;
- }
- }
-
- public String TopicName
- {
- get { return Path; }
- }
-
-
- public override Destination CreateDestination(String name)
- {
- return new Topic(name);
- }
-
-
- public override Destination CreateDestination(String name, string subject, OptionsMap options)
- {
- return new Topic(name, subject, options);
- }
- }
-}
-
http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/9122762b/src/main/ndoc/NamespaceSummary.xml
----------------------------------------------------------------------
diff --git a/src/main/ndoc/NamespaceSummary.xml b/src/main/ndoc/NamespaceSummary.xml
deleted file mode 100644
index b8e19d5..0000000
--- a/src/main/ndoc/NamespaceSummary.xml
+++ /dev/null
@@ -1,21 +0,0 @@
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<namespaces>
- <namespace name="NMS">
- The <b>NMS</b> namespace defines the .Net Message System API which is an interface to messaging systems rather like JMS is for Java.
- </namespace>
-</namespaces>
http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/9122762b/src/main/sandcastle/feedback_content.xml
----------------------------------------------------------------------
diff --git a/src/main/sandcastle/feedback_content.xml b/src/main/sandcastle/feedback_content.xml
deleted file mode 100644
index e44de7a..0000000
--- a/src/main/sandcastle/feedback_content.xml
+++ /dev/null
@@ -1,32 +0,0 @@
-<content xml:space="preserve">
-
- <item id="fb_alias">activemq.docs@apache.org</item>
- <item id="fb_product"></item>
- <item id="fb_deliverable"></item>
-
- <item id="fb_subject">Customer%20Feedback</item>
- <item id="fb_body">%0\dThank%20you%20for%20your%20feedback.%20The%20developer%20writing%20teams%20use%20your%20feedback%20to%20improve%20documentation.%20While%20we%20are%20reviewing%20your%20feedback,%20we%20may%20send%20you%20e-mail%20to%20ask%20for%20clarification%20or%20feedback%20on%20a%20solution.%20We%20do%20not%20use%20your%20e-mail%20address%20for%20any%20other%20purpose.%0\d</item>
-
- <item id="fb_headerFeedBack">Send Feedback</item>
-
-
- <!-- feedback values for sandcastle scenario -->
-
- <item id="feedback_alias"></item>
- <item id="feedback_product"></item>
- <item id="feedback_deliverable"></item>
- <item id="feedback_fileVersion"></item>
- <item id="feedback_topicVersion"></item>
- <item id="feedback_body"></item>
- <item id="feedback_subject"></item>
-
- <item id="fb_Introduction">We value your feedback. To rate this topic and send feedback about this topic to the documentation team, click a rating, and then click <b>Send Feedback</b>. For assistance with support issues, refer to the technical support information included with the product.</item>
-
- <item id="fb_Send">Send Feedback</item>
- <item id="fb_Poor">Poor</item>
- <item id="fb_Excellent">Outstanding</item>
- <item id="fb_EnterFeedbackText">To e-mail your feedback, click here:</item>
- <item id="fb_Title">Documentation Feedback</item>
- <item id="fb_altIcon">Display feedback instructions at the bottom of the page.</item>
-
-</content>
\ No newline at end of file