You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2014/08/05 19:52:53 UTC
svn commit: r1615967 - in /avro/trunk/lang/csharp/src/apache:
ipc/HttpListenerServer.cs ipc/HttpTransceiver.cs
test/Ipc/HttpClientServerTest.cs
Author: cutting
Date: Tue Aug 5 17:52:53 2014
New Revision: 1615967
URL: http://svn.apache.org/r1615967
Log:
C#: Add support for RPC over HTTP. Contributed by Dmitry Kovalev.
Added:
avro/trunk/lang/csharp/src/apache/ipc/HttpListenerServer.cs
avro/trunk/lang/csharp/src/apache/ipc/HttpTransceiver.cs
avro/trunk/lang/csharp/src/apache/test/Ipc/HttpClientServerTest.cs
Added: avro/trunk/lang/csharp/src/apache/ipc/HttpListenerServer.cs
URL: http://svn.apache.org/viewvc/avro/trunk/lang/csharp/src/apache/ipc/HttpListenerServer.cs?rev=1615967&view=auto
==============================================================================
--- avro/trunk/lang/csharp/src/apache/ipc/HttpListenerServer.cs (added)
+++ avro/trunk/lang/csharp/src/apache/ipc/HttpListenerServer.cs Tue Aug 5 17:52:53 2014
@@ -0,0 +1,87 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Net;
+using System.IO;
+using System.Diagnostics;
+
+namespace Avro.ipc
+{
+ public class HttpListenerServer
+ {
+ IEnumerable<string> _prefixes;
+ HttpListener _listener;
+ Responder _responder;
+
+ public HttpListenerServer(IEnumerable<string> listenOnPrefixes, Responder responder)
+ {
+ _responder = responder;
+ _prefixes = listenOnPrefixes;
+ }
+
+ //TODO: apparently this doesn't compile in Mono - investigate
+ //public Action<Exception, IAsyncResult> ExceptionHandler { get; set; }
+
+ protected void HttpListenerCallback(IAsyncResult result)
+ {
+ try
+ {
+ HttpListener listener = (HttpListener)result.AsyncState;
+ if (_listener != listener) //the server which began this callback was stopped - just exit
+ return;
+ HttpListenerContext context = listener.EndGetContext(result);
+
+ listener.BeginGetContext(HttpListenerCallback, listener); //spawn listening for next request so it can be processed while we are dealing with this one
+
+ //process this request
+ if (!context.Request.HttpMethod.Equals("POST"))
+ throw new AvroRuntimeException("HTTP method must be POST");
+ if (!context.Request.ContentType.Equals("avro/binary"))
+ throw new AvroRuntimeException("Content-type must be avro/binary");
+
+ byte[] intBuffer = new byte[4];
+ var buffers = HttpTransceiver.ReadBuffers(context.Request.InputStream, intBuffer);
+
+ buffers = _responder.Respond(buffers);
+ context.Response.ContentType = "avro/binary";
+ context.Response.ContentLength64 = HttpTransceiver.CalculateLength(buffers);
+
+ HttpTransceiver.WriteBuffers(buffers, context.Response.OutputStream);
+
+ context.Response.OutputStream.Close();
+ context.Response.Close();
+ }
+ catch (Exception ex)
+ {
+ //TODO: apparently this doesn't compile in Mono - investigate
+ //if (ExceptionHandler != null)
+ // ExceptionHandler(ex, result);
+ //else
+ // Debug.Print("Exception occured while processing a request, no exception handler was provided - ignoring", ex);
+ Debug.Print("Exception occured while processing a web request, skipping this request: ", ex);
+ }
+ }
+
+ public void Start()
+ {
+ _listener = new HttpListener();
+
+ foreach (string s in _prefixes)
+ {
+ _listener.Prefixes.Add(s);
+ }
+
+ _listener.Start();
+
+ _listener.BeginGetContext(HttpListenerCallback, _listener);
+ }
+
+ public void Stop()
+ {
+ _listener.Stop();
+ _listener.Close();
+ _listener = null;
+ }
+ }
+}
Added: avro/trunk/lang/csharp/src/apache/ipc/HttpTransceiver.cs
URL: http://svn.apache.org/viewvc/avro/trunk/lang/csharp/src/apache/ipc/HttpTransceiver.cs?rev=1615967&view=auto
==============================================================================
--- avro/trunk/lang/csharp/src/apache/ipc/HttpTransceiver.cs (added)
+++ avro/trunk/lang/csharp/src/apache/ipc/HttpTransceiver.cs Tue Aug 5 17:52:53 2014
@@ -0,0 +1,156 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Reflection;
+using System.IO;
+using System.Net;
+
+namespace Avro.ipc
+{
+ public class HttpTransceiver : Transceiver
+ {
+ private byte[] _intBuffer = new byte[4]; //this buffer is used by read/write behind the latch controlled by base class so we are sure there is no race condition
+ private HttpWebRequest _httpRequest;
+ private HttpWebRequest _modelRequest;
+
+ public override string RemoteName
+ {
+ get
+ {
+ return _modelRequest.RequestUri.AbsoluteUri;
+ }
+ }
+
+ public HttpTransceiver(HttpWebRequest modelRequest)
+ {
+ _modelRequest = modelRequest;
+ }
+
+ public HttpTransceiver(Uri serviceUri, int timeoutMs)
+ {
+ _modelRequest = (HttpWebRequest)WebRequest.Create(serviceUri);
+ _modelRequest.Method = "POST";
+ _modelRequest.ContentType = "avro/binary";
+ _modelRequest.Timeout = timeoutMs;
+ }
+
+ private static int ReadInt(Stream stream, byte[] buffer)
+ {
+ stream.Read(buffer, 0, 4);
+ return IPAddress.NetworkToHostOrder(BitConverter.ToInt32(buffer, 0));
+ }
+
+ public static byte[] ConvertIntToBytes(int value)
+ {
+ return BitConverter.GetBytes(IPAddress.HostToNetworkOrder(value));
+ }
+
+ public static int CalculateLength(IList<MemoryStream> buffers)
+ {
+ int num = 0;
+ foreach (MemoryStream memoryStream in (IEnumerable<MemoryStream>)buffers)
+ {
+ num += 4;
+ num += (int)memoryStream.Length;
+ }
+ return num + 4;
+ }
+
+ public static IList<MemoryStream> ReadBuffers(Stream inStream, byte[] intBuffer)
+ {
+ List<MemoryStream> list = new List<MemoryStream>();
+ while (true)
+ {
+ int length = ReadInt(inStream, intBuffer);
+
+ if (length == 0) //end of transmission
+ break;
+
+ byte[] buffer = new byte[length];
+ int offset = 0;
+ int count = length;
+ while (offset < length)
+ {
+ int num = inStream.Read(buffer, offset, count);
+ if (num == 0)
+ throw new Exception(string.Format("Unexpected end of response binary stream - expected {0} more bytes in current chunk", (object)count));
+ offset += num;
+ count -= num;
+ }
+
+ list.Add(new MemoryStream(buffer));
+ }
+ return (IList<MemoryStream>)list;
+ }
+
+ public override IList<MemoryStream> ReadBuffers()
+ {
+ using (Stream responseStream = this._httpRequest.GetResponse().GetResponseStream())
+ {
+ return ReadBuffers(responseStream, _intBuffer);
+ }
+ }
+
+ protected HttpWebRequest CreateAvroHttpRequest(long contentLength)
+ {
+ HttpWebRequest wr = (HttpWebRequest)WebRequest.Create(_modelRequest.RequestUri);
+
+ //TODO: what else to copy from model request?
+ wr.AllowAutoRedirect = _modelRequest.AllowAutoRedirect;
+ wr.AllowWriteStreamBuffering = _modelRequest.AllowWriteStreamBuffering;
+ wr.AuthenticationLevel = _modelRequest.AuthenticationLevel;
+ wr.AutomaticDecompression = _modelRequest.AutomaticDecompression;
+ wr.CachePolicy = _modelRequest.CachePolicy;
+ wr.ClientCertificates.AddRange(_modelRequest.ClientCertificates);
+ wr.ConnectionGroupName = _modelRequest.ConnectionGroupName;
+ wr.ContinueDelegate = _modelRequest.ContinueDelegate;
+ wr.CookieContainer = _modelRequest.CookieContainer;
+ wr.Credentials = _modelRequest.Credentials;
+ wr.UnsafeAuthenticatedConnectionSharing = _modelRequest.UnsafeAuthenticatedConnectionSharing;
+ wr.UseDefaultCredentials = _modelRequest.UseDefaultCredentials;
+
+ wr.KeepAlive = _modelRequest.KeepAlive;
+ wr.Expect = _modelRequest.Expect;
+ //wr.Date = _modelRequest.Date;
+ //wr.Host = _modelRequest.Host;
+ wr.UserAgent = _modelRequest.UserAgent;
+ //wr.Headers = _modelRequest.Headers;
+ wr.Referer = _modelRequest.Referer;
+
+ wr.Pipelined = _modelRequest.Pipelined;
+ wr.PreAuthenticate = _modelRequest.PreAuthenticate;
+ wr.ProtocolVersion = _modelRequest.ProtocolVersion;
+ wr.Proxy = _modelRequest.Proxy;
+ wr.ReadWriteTimeout = _modelRequest.ReadWriteTimeout;
+ wr.Timeout = _modelRequest.Timeout;
+
+ //the properties which are defined by Avro specification
+ wr.Method = "POST";
+ wr.ContentType = "avro/binary";
+ wr.ContentLength = contentLength;
+
+ return wr;
+ }
+
+ public static void WriteBuffers(IList<MemoryStream> buffers, Stream outStream)
+ {
+ foreach (MemoryStream memoryStream in buffers)
+ {
+ int num = (int)memoryStream.Length;
+ outStream.Write(ConvertIntToBytes(num), 0, 4);
+ memoryStream.WriteTo(outStream);
+ }
+ outStream.Write(ConvertIntToBytes(0), 0, 4);
+ outStream.Flush();
+ }
+
+ public override void WriteBuffers(IList<MemoryStream> buffers)
+ {
+ _httpRequest = CreateAvroHttpRequest(CalculateLength(buffers));
+ using (Stream requestStream = _httpRequest.GetRequestStream())
+ {
+ WriteBuffers(buffers, requestStream);
+ }
+ }
+ }
+}
Added: avro/trunk/lang/csharp/src/apache/test/Ipc/HttpClientServerTest.cs
URL: http://svn.apache.org/viewvc/avro/trunk/lang/csharp/src/apache/test/Ipc/HttpClientServerTest.cs?rev=1615967&view=auto
==============================================================================
--- avro/trunk/lang/csharp/src/apache/test/Ipc/HttpClientServerTest.cs (added)
+++ avro/trunk/lang/csharp/src/apache/test/Ipc/HttpClientServerTest.cs Tue Aug 5 17:52:53 2014
@@ -0,0 +1,72 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Net;
+
+using NUnit.Framework;
+//using Microsoft.VisualStudio.TestTools.UnitTesting;
+
+using Avro.ipc;
+using Avro.ipc.Generic;
+using Avro.Generic;
+
+namespace Avro.Test.Ipc
+{
+ [TestFixture]
+ //[TestClass]
+ public class HttpClientServerTest
+ {
+ private HttpListenerServer server;
+ private MailResponder mailResponder;
+ private HttpTransceiver transceiver;
+ private GenericRequestor proxy;
+
+ const string URL = @"http://localhost:18080/avro/test/ipc/mailResponder/";
+
+ [TestFixtureSetUp]
+ //[TestInitialize]
+ public void Init()
+ {
+ mailResponder = new MailResponder();
+
+ server = new HttpListenerServer(new string[] { URL }, mailResponder);
+ server.Start();
+
+ HttpWebRequest requestTemplate = (HttpWebRequest)HttpWebRequest.Create(URL);
+ requestTemplate.Timeout = 6000;
+ requestTemplate.Proxy = null;
+ transceiver = new HttpTransceiver(requestTemplate);
+ proxy = new GenericRequestor(transceiver, MailResponder.Protocol);
+ }
+
+ [TestFixtureTearDown]
+ //[TestCleanup]
+ public void Cleanup()
+ {
+ server.Stop();
+ }
+
+ private string Send(GenericRecord message)
+ {
+ var request = new GenericRecord(MailResponder.Protocol.Messages["send"].Request);
+ request.Add("message", message);
+
+ var result = (string)proxy.Request("send", request);
+ return result;
+ }
+
+ [Test]
+ //[TestMethod]
+ public void TestRequestResponse()
+ {
+ for (int x = 0; x < 5; x++)
+ {
+ var message = SocketServerTest.CreateMessage();
+
+ var result = Send(message);
+ SocketServerTest.VerifyResponse(result);
+ }
+ }
+ }
+}