You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2014/08/05 19:52:53 UTC

svn commit: r1615967 - in /avro/trunk/lang/csharp/src/apache: ipc/HttpListenerServer.cs ipc/HttpTransceiver.cs test/Ipc/HttpClientServerTest.cs

Author: cutting
Date: Tue Aug  5 17:52:53 2014
New Revision: 1615967

URL: http://svn.apache.org/r1615967
Log:
C#: Add support for RPC over HTTP.  Contributed by Dmitry Kovalev.

Added:
    avro/trunk/lang/csharp/src/apache/ipc/HttpListenerServer.cs
    avro/trunk/lang/csharp/src/apache/ipc/HttpTransceiver.cs
    avro/trunk/lang/csharp/src/apache/test/Ipc/HttpClientServerTest.cs

Added: avro/trunk/lang/csharp/src/apache/ipc/HttpListenerServer.cs
URL: http://svn.apache.org/viewvc/avro/trunk/lang/csharp/src/apache/ipc/HttpListenerServer.cs?rev=1615967&view=auto
==============================================================================
--- avro/trunk/lang/csharp/src/apache/ipc/HttpListenerServer.cs (added)
+++ avro/trunk/lang/csharp/src/apache/ipc/HttpListenerServer.cs Tue Aug  5 17:52:53 2014
@@ -0,0 +1,87 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Net;
+using System.IO;
+using System.Diagnostics;
+
+namespace Avro.ipc
+{
+    public class HttpListenerServer
+    {
+        IEnumerable<string> _prefixes;
+        HttpListener _listener;
+        Responder _responder;
+
+        public HttpListenerServer(IEnumerable<string> listenOnPrefixes, Responder responder)
+        {
+            _responder = responder;
+            _prefixes = listenOnPrefixes;
+        }
+
+        //TODO: apparently this doesn't compile in Mono - investigate
+        //public Action<Exception, IAsyncResult> ExceptionHandler { get; set; }
+
+        protected void HttpListenerCallback(IAsyncResult result)
+        {
+            try
+            {                
+                HttpListener listener = (HttpListener)result.AsyncState;
+                if (_listener != listener) //the server which began this callback was stopped - just exit
+                    return;
+                HttpListenerContext context = listener.EndGetContext(result);
+
+                listener.BeginGetContext(HttpListenerCallback, listener); //spawn listening for next request so it can be processed while we are dealing with this one
+
+                //process this request
+                if (!context.Request.HttpMethod.Equals("POST"))
+                    throw new AvroRuntimeException("HTTP method must be POST");
+                if (!context.Request.ContentType.Equals("avro/binary"))
+                    throw new AvroRuntimeException("Content-type must be avro/binary");
+
+                byte[] intBuffer = new byte[4];
+                var buffers = HttpTransceiver.ReadBuffers(context.Request.InputStream, intBuffer);
+
+                buffers = _responder.Respond(buffers);
+                context.Response.ContentType = "avro/binary";
+                context.Response.ContentLength64 = HttpTransceiver.CalculateLength(buffers);
+
+                HttpTransceiver.WriteBuffers(buffers, context.Response.OutputStream);
+
+                context.Response.OutputStream.Close();
+                context.Response.Close();
+            }
+            catch (Exception ex)
+            {
+                //TODO: apparently this doesn't compile in Mono - investigate
+                //if (ExceptionHandler != null)
+                //    ExceptionHandler(ex, result);
+                //else
+                //    Debug.Print("Exception occured while processing a request, no exception handler was provided - ignoring", ex);
+                Debug.Print("Exception occured while processing a web request, skipping this request: ", ex);
+            }
+        }
+
+        public void Start()
+        {
+            _listener = new HttpListener();
+
+            foreach (string s in _prefixes)
+            {
+                _listener.Prefixes.Add(s);
+            }
+
+            _listener.Start();
+
+            _listener.BeginGetContext(HttpListenerCallback, _listener);
+        }
+
+        public void Stop()
+        {
+            _listener.Stop();
+            _listener.Close();
+            _listener = null;
+        }
+    }
+}

Added: avro/trunk/lang/csharp/src/apache/ipc/HttpTransceiver.cs
URL: http://svn.apache.org/viewvc/avro/trunk/lang/csharp/src/apache/ipc/HttpTransceiver.cs?rev=1615967&view=auto
==============================================================================
--- avro/trunk/lang/csharp/src/apache/ipc/HttpTransceiver.cs (added)
+++ avro/trunk/lang/csharp/src/apache/ipc/HttpTransceiver.cs Tue Aug  5 17:52:53 2014
@@ -0,0 +1,156 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Reflection;
+using System.IO;
+using System.Net;
+
+namespace Avro.ipc
+{
+    public class HttpTransceiver : Transceiver
+    {
+        private byte[] _intBuffer = new byte[4]; //this buffer is used by read/write behind the latch controlled by base class so we are sure there is no race condition
+        private HttpWebRequest _httpRequest;
+        private HttpWebRequest _modelRequest;
+
+        public override string RemoteName
+        {
+            get
+            {
+                return _modelRequest.RequestUri.AbsoluteUri;
+            }
+        }
+
+        public HttpTransceiver(HttpWebRequest modelRequest)
+        {
+            _modelRequest = modelRequest;
+        }
+
+        public HttpTransceiver(Uri serviceUri, int timeoutMs)
+        {
+            _modelRequest = (HttpWebRequest)WebRequest.Create(serviceUri);
+            _modelRequest.Method = "POST";
+            _modelRequest.ContentType = "avro/binary";
+            _modelRequest.Timeout = timeoutMs;
+        }
+
+        private static int ReadInt(Stream stream, byte[] buffer)
+        {
+            stream.Read(buffer, 0, 4);
+            return IPAddress.NetworkToHostOrder(BitConverter.ToInt32(buffer, 0));
+        }
+
+        public static byte[] ConvertIntToBytes(int value)
+        {
+            return BitConverter.GetBytes(IPAddress.HostToNetworkOrder(value));
+        }
+
+        public static int CalculateLength(IList<MemoryStream> buffers)
+        {
+            int num = 0;
+            foreach (MemoryStream memoryStream in (IEnumerable<MemoryStream>)buffers)
+            {
+                num += 4;
+                num += (int)memoryStream.Length;
+            }
+            return num + 4;
+        }
+
+        public static IList<MemoryStream> ReadBuffers(Stream inStream, byte[] intBuffer)
+        {
+            List<MemoryStream> list = new List<MemoryStream>();
+            while (true)
+            {
+                int length = ReadInt(inStream, intBuffer);
+
+                if (length == 0) //end of transmission
+                    break;
+
+                byte[] buffer = new byte[length];
+                int offset = 0;
+                int count = length;
+                while (offset < length)
+                {
+                    int num = inStream.Read(buffer, offset, count);
+                    if (num == 0)
+                        throw new Exception(string.Format("Unexpected end of response binary stream - expected {0} more bytes in current chunk", (object)count));
+                    offset += num;
+                    count -= num;
+                }
+
+                list.Add(new MemoryStream(buffer));
+            }
+            return (IList<MemoryStream>)list;
+        }
+
+        public override IList<MemoryStream> ReadBuffers()
+        {
+            using (Stream responseStream = this._httpRequest.GetResponse().GetResponseStream())
+            {
+                return ReadBuffers(responseStream, _intBuffer);
+            }
+        }
+
+        protected HttpWebRequest CreateAvroHttpRequest(long contentLength)
+        {
+            HttpWebRequest wr = (HttpWebRequest)WebRequest.Create(_modelRequest.RequestUri);
+            
+            //TODO: what else to copy from model request?
+            wr.AllowAutoRedirect = _modelRequest.AllowAutoRedirect;
+            wr.AllowWriteStreamBuffering = _modelRequest.AllowWriteStreamBuffering;
+            wr.AuthenticationLevel = _modelRequest.AuthenticationLevel;
+            wr.AutomaticDecompression = _modelRequest.AutomaticDecompression;
+            wr.CachePolicy = _modelRequest.CachePolicy;
+            wr.ClientCertificates.AddRange(_modelRequest.ClientCertificates);
+            wr.ConnectionGroupName = _modelRequest.ConnectionGroupName;
+            wr.ContinueDelegate = _modelRequest.ContinueDelegate;
+            wr.CookieContainer = _modelRequest.CookieContainer;
+            wr.Credentials = _modelRequest.Credentials;
+            wr.UnsafeAuthenticatedConnectionSharing = _modelRequest.UnsafeAuthenticatedConnectionSharing;
+            wr.UseDefaultCredentials = _modelRequest.UseDefaultCredentials;
+
+            wr.KeepAlive = _modelRequest.KeepAlive;
+            wr.Expect = _modelRequest.Expect;
+            //wr.Date = _modelRequest.Date;
+            //wr.Host = _modelRequest.Host;
+            wr.UserAgent = _modelRequest.UserAgent;
+            //wr.Headers = _modelRequest.Headers;
+            wr.Referer = _modelRequest.Referer;
+
+            wr.Pipelined = _modelRequest.Pipelined;
+            wr.PreAuthenticate = _modelRequest.PreAuthenticate;
+            wr.ProtocolVersion = _modelRequest.ProtocolVersion;
+            wr.Proxy = _modelRequest.Proxy;
+            wr.ReadWriteTimeout = _modelRequest.ReadWriteTimeout;
+            wr.Timeout = _modelRequest.Timeout;
+
+            //the properties which are defined by Avro specification
+            wr.Method = "POST";
+            wr.ContentType = "avro/binary";
+            wr.ContentLength = contentLength;
+
+            return wr;
+        }
+
+        public static void WriteBuffers(IList<MemoryStream> buffers, Stream outStream)
+        {
+            foreach (MemoryStream memoryStream in buffers)
+            {
+                int num = (int)memoryStream.Length;
+                outStream.Write(ConvertIntToBytes(num), 0, 4);
+                memoryStream.WriteTo(outStream);
+            }
+            outStream.Write(ConvertIntToBytes(0), 0, 4);
+            outStream.Flush();
+        }
+
+        public override void WriteBuffers(IList<MemoryStream> buffers)
+        {
+            _httpRequest = CreateAvroHttpRequest(CalculateLength(buffers));
+            using (Stream requestStream = _httpRequest.GetRequestStream())
+            {
+                WriteBuffers(buffers, requestStream);
+            }
+        }
+    }
+}

Added: avro/trunk/lang/csharp/src/apache/test/Ipc/HttpClientServerTest.cs
URL: http://svn.apache.org/viewvc/avro/trunk/lang/csharp/src/apache/test/Ipc/HttpClientServerTest.cs?rev=1615967&view=auto
==============================================================================
--- avro/trunk/lang/csharp/src/apache/test/Ipc/HttpClientServerTest.cs (added)
+++ avro/trunk/lang/csharp/src/apache/test/Ipc/HttpClientServerTest.cs Tue Aug  5 17:52:53 2014
@@ -0,0 +1,72 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Net;
+
+using NUnit.Framework;
+//using Microsoft.VisualStudio.TestTools.UnitTesting;
+
+using Avro.ipc;
+using Avro.ipc.Generic;
+using Avro.Generic;
+
+namespace Avro.Test.Ipc
+{
+    [TestFixture]
+    //[TestClass]
+    public class HttpClientServerTest
+    {
+        private HttpListenerServer server;
+        private MailResponder mailResponder;
+        private HttpTransceiver transceiver;
+        private GenericRequestor proxy;
+
+        const string URL = @"http://localhost:18080/avro/test/ipc/mailResponder/";
+
+        [TestFixtureSetUp]
+        //[TestInitialize]
+        public void Init()
+        {
+            mailResponder = new MailResponder();
+
+            server = new HttpListenerServer(new string[] { URL }, mailResponder);
+            server.Start();
+
+            HttpWebRequest requestTemplate = (HttpWebRequest)HttpWebRequest.Create(URL);
+            requestTemplate.Timeout = 6000;
+            requestTemplate.Proxy = null;
+            transceiver = new HttpTransceiver(requestTemplate);
+            proxy = new GenericRequestor(transceiver, MailResponder.Protocol);
+        }
+
+        [TestFixtureTearDown]
+        //[TestCleanup]
+        public void Cleanup()
+        {
+            server.Stop();
+        }
+
+        private string Send(GenericRecord message)
+        {
+            var request = new GenericRecord(MailResponder.Protocol.Messages["send"].Request);
+            request.Add("message", message);
+
+            var result = (string)proxy.Request("send", request);
+            return result;
+        }
+
+        [Test]
+        //[TestMethod]
+        public void TestRequestResponse()
+        {
+            for (int x = 0; x < 5; x++)
+            {
+                var message = SocketServerTest.CreateMessage();
+
+                var result = Send(message);
+                SocketServerTest.VerifyResponse(result);
+            }
+        }
+    }
+}