You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@qpid.apache.org by "Aidan Skinner (JIRA)" <qp...@incubator.apache.org> on 2008/11/11 15:43:44 UTC

[jira] Updated: (QPID-1146) Excel RTD Server

     [ https://issues.apache.org/jira/browse/QPID-1146?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Aidan Skinner updated QPID-1146:
--------------------------------

    Fix Version/s:     (was: M4)

> Excel RTD Server
> ----------------
>
>                 Key: QPID-1146
>                 URL: https://issues.apache.org/jira/browse/QPID-1146
>             Project: Qpid
>          Issue Type: New Feature
>          Components: Dot Net Client
>    Affects Versions: M2
>         Environment: Windows .NET with Excel
>            Reporter: Shahbaz Chaudhary
>            Assignee: Aidan Skinner
>         Attachments: RTDTest.cs, RTDTest.cs
>
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> --QUOTED FROM AN EMAIL SENT TO QPID'S MAILING LIST--
> Hi All,
> The following email contains an Excel RTD server which is able to subscribe to information from an M2 Qpid server.
> This is just a proof of concept, there are almost no optimizations, check for leaks, etc.  I'm not really a .NET programmer and I have never done any COM programming (and no C++ since college).
> In any case, try it out.  It works for me.  I haven't figured out how to run it outside Visual Studio 2008 yet (no idea how to create install packages, register assemblies, etc.).
> QPID C# folks are welcome to use it as they wish.  I probably won't maintain this, hopefully someone else will.
> In excel, you just have to type the following formula:
> =rtd("rtd.test",,"amqp://guest:guest@1/test?brokerlist='tcp://<host>:567
> 2'","<topic>","<field>")
> This will subscribe to a topic, each time an update is received, it will retrieve a field and display it in Excel.
> Example:
> =rtd("rtd.test",,"amqp://guest:guest@1/test?brokerlist='tcp://<host>:567
> 2'","md.bidsoffers","price")
> This will just display the price for all incoming bidsoffers (so MSFT/ORCL/EBAY will be mixed in)
> You can also add 'filters' to the RTD function.  Just append two parameters to the end of the previous RTD function, the first parameter refers to the field you wish to use in comparison, the second parameter refers to the value the first parameter must have.
> Example:
> =rtd("rtd.test",,"amqp://guest:guest@1/test?brokerlist='tcp://<host>:567
> 2'","md.bidsoffers","price",
> "symbol","MSFT")
> This will subscribe to the same information as the last RTD function, but will only display prices for MSFT.
> You can add as many filters as you like, just make sure you append the RTD function with a filter field and a filter value.
> --8<------------------------------RTDTest.cs----------------------------
> -----
> using System;
> using System.Collections.Generic;
> using System.Linq;
> using System.Text;
> using Apache.Qpid.Messaging;
> using Apache.Qpid.Client.Qms;
> using Apache.Qpid.Client;
> using System.Runtime.InteropServices;
> using Microsoft.Office.Interop.Excel;
> //Shahbaz Chaudhary
> namespace RTDTest
> {
>     [ComVisible(true), ProgId("RTD.Test")]
>     public class RTDTest : IRtdServer
>     {
>         //QPID CACHE
>         Dictionary<string, IChannel> channelCache;//url, channel
>         Dictionary<string, int> channelCacheCount;
>         Dictionary<string, IMessageConsumer> topicCache;//url+topic, consumer
>         Dictionary<string, int> topicCacheCount;
>         Dictionary<int, string> topicIDCache;//url+topic+field,  topicid
>         Dictionary<string, IList<Tuple2<int, string>>> topicTopicIDFieldCache;
>         Dictionary<int, Tuple2<string[], string[]>> filtersCache;
>         //END QPID CACHE
>         //IRTDServer Globals
>         IRTDUpdateEvent updateEvent;
>         Queue<Tuple2<int, object>> refreshQ;
>         //END IRTDServer Globals
>         public RTDTest()
>         {
>             channelCache = new Dictionary<string, IChannel>();
>             topicCache = new Dictionary<string, IMessageConsumer>();
>             channelCacheCount = new Dictionary<string, int>();
>             topicCacheCount = new Dictionary<string, int>();
>             topicIDCache = new Dictionary<int, string>();
>             topicTopicIDFieldCache = new Dictionary<string, IList<Tuple2<int, string>>>();
>             filtersCache = new Dictionary<int, Tuple2<string[], string[]>>();
>             refreshQ = new Queue<Tuple2<int, object>>();
>         }
>         //QPID METHODS
>         private IChannel getChannel(string url)
>         {
>             IChannel chan;
>             if (channelCache.ContainsKey(url))
>             {
>                 chan = channelCache[url];
>             }
>             else
>             {
>                 IConnectionInfo connectionInfo = QpidConnectionInfo.FromUrl(url);
>                 Apache.Qpid.Messaging.IConnection connection = new AMQConnection(connectionInfo);
>                 IChannel channel = connection.CreateChannel(false, AcknowledgeMode.AutoAcknowledge, 1);
>                 connection.Start();
>                 chan = channel;
>                 channelCache[url] = chan;
>             }
>             return chan;
>         }
>         private IMessageConsumer getTopicConsumer(string url, string
> topic)
>         {
>             IMessageConsumer cons;
>             string key = url + topic;
>             if (topicCache.ContainsKey(key))
>             {
>                 cons = topicCache[key];
>             }
>             else
>             {
>                 IChannel channel = getChannel(url);
>                 string tempQ = channel.GenerateUniqueName();
>                 channel.DeclareQueue(tempQ, false, true, true);
>                 cons = channel.CreateConsumerBuilder(tempQ).Create();
>                 channel.Bind(tempQ, ExchangeNameDefaults.TOPIC, topic);
>                 topicCache[key] = cons;
>             }
>             return cons;
>         }
>         private IList<Tuple2<int, string>> getFields(string topic)
>         {
>             if (!topicTopicIDFieldCache.ContainsKey(topic))
>             {
>                 topicTopicIDFieldCache[topic] = new List<Tuple2<int,
> string>>();
>             }
>             return topicTopicIDFieldCache[topic];
>         }
>         private void onMessage(IMessage msg, string url, string topic, string field, int topicid)
>         {
>                 foreach (Tuple2<int, string> f in getFields(topic))//?
>                 {
>                     int id = f.a;
>                     object value = msg.Headers[f.b];
>                     //Dictionary<int, object> d = new Dictionary<int,
> object>();
>                     //d.Add(id, value);
>                     string[] filterFields = filtersCache[id].a;
>                     string[] filterVals = filtersCache[id].b;
>                     if(allFiltersTrue(filterFields,filterVals,msg)){
>                         refreshQ.Enqueue(new Tuple2<int,object>(id,value));
>                     }
>                 }
>                 try
>                 {
>                     updateEvent.UpdateNotify();
>                 }
>                 catch (COMException e)
>                 {
>                 }
>         }
>         void registerTopicID(string url, string topic, string field, int
> topicid)
>         {
>             string val = url + "|" + topic + "|" + field;
>             topicIDCache.Add(topicid, val);
>             Tuple2<int, string> dict = new Tuple2<int,
> string>(topicid,field);
>             getFields(topic).Add(dict);
>             if (!channelCacheCount.ContainsKey(url))
> channelCacheCount[url] = 0;
>             channelCacheCount[url]++;
>             if (!topicCacheCount.ContainsKey(url + "|" + topic)) topicCacheCount[url + "|" + topic] = 0;
>             topicCacheCount[url + "|" + topic]++;
>             getTopicConsumer(url, topic).OnMessage += msg => { onMessage(msg,url, topic, field, topicid); };
>         }
>         private bool allFiltersTrue(string[] filterKeys, string[] filterVals, IMessage msg)
>         {
>             for (int i = 0; i < filterKeys.Length; i++)
>             {
>                 if
> (!msg.Headers[filterKeys[i]].ToString().Equals(filterVals[i]))
>                 {
>                     return false;
>                 }
>             }
>             return true;
>         }
>         public void removeRegisteredTopic(int topicid)
>         {
>             string vals = topicIDCache[topicid];
>             string[] keys = vals.Split(new char[] { '|' });
>             string url = keys[0];
>             string topic = keys[1];
>             string field = keys[2];
>             channelCacheCount[url]--;
>             topicCacheCount[url + "|" + topic]--;
>             if (channelCacheCount[url] <= 0)
>             {
>                 channelCacheCount.Remove(url);
>                 channelCache[url].Dispose();
>                 channelCache.Remove(url);
>             }
>             if (topicCacheCount[url + "|" + topic] <= 0)
>             {
>                 topicCacheCount.Remove(url + "|" + topic);
>                 topicCache[url + "|" + topic].Dispose();
>                 topicCache.Remove(url + "|" + topic);
>                 topicTopicIDFieldCache.Remove(topic);
>             }
>             filtersCache.Remove(topicid);
>         }
>         //END QPID METHODS
>  
> //----------------------------------------------------------------------
> -----------------------------------
>         //IRTDServer METHODS
>         #region IRtdServer Members
>         public int ServerStart(IRTDUpdateEvent CallbackObject)
>         {
>             updateEvent = CallbackObject;
>             return 1;
>         }
>         public object ConnectData(int TopicID, ref Array Strings, ref bool GetNewValues)
>         {
>             int size = Strings.Length;
>             int conditions = (int)Math.Floor((double)(size - 3) / 2);
>             string url;
>             string topic;
>             string field;
>             string[] filterKeys = new string[conditions];
>             string[] filterVals = new string[conditions];
>             url = (string)Strings.GetValue(0);
>             topic = (string)Strings.GetValue(1);
>             field = (string)Strings.GetValue(2);
>             for (int i = 0; i < conditions; i = i + 2)
>             {
>                 filterKeys[i] = (string)Strings.GetValue(i + 3);
>                 filterVals[i] = (string)Strings.GetValue(i + 1 + 3);
>             }
>             Tuple2<string[], string[]> filters = new Tuple2<string[], string[]>(filterKeys,filterVals);
>             filtersCache.Add(TopicID, filters);
>             registerTopicID(url, topic, field, TopicID);
>             return "Getting data...";
>         }
>         public void DisconnectData(int TopicID)
>         {
>             removeRegisteredTopic(TopicID);
>         }
>         public int Heartbeat()
>         {
>             return 1;
>         }
>         public Array RefreshData(ref int TopicCount)
>         {
>             Tuple2<int, object> data;
>             object[,] result = new object[2, refreshQ.Count];
>             TopicCount = 0;
>             for (int i = 0; i < refreshQ.Count; i++)
>             {
>                 data = refreshQ.Dequeue();
>                 TopicCount++;
>                 result[0, i] = data.a;
>                 result[1, i] = data.b;
>             }
>             return result;
>         }
>         public void ServerTerminate()
>         {
>             foreach (IChannel c in channelCache.Values)
>             {
>                 c.Dispose();
>             }
>         }
>         #endregion
>         //END IRTDServer METHODS
>     }
>     class Tuple2<T, U>
>     {
>         public Tuple2(T t, U u)
>         {
>             a = t;
>             b = u;
>         }
>         public T a { get; set; }
>         public U b { get; set; }
>     }
>     class Tuple3<T, U, V>
>     {
>         public Tuple3(T t, U u, V v)
>         {
>             a = t;
>             b = u;
>             c = v;
>         }
>         public T a { get; set; }
>         public U b { get; set; }
>         public V c { get; set; }
>     }
> }

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.