You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@etch.apache.org by fi...@apache.org on 2011/05/15 23:07:42 UTC

svn commit: r1103543 - in /incubator/etch/trunk: binding-csharp/runtime/src/main/csharp/ binding-csharp/runtime/src/main/csharp/Org.Apache.Etch.Bindings.Csharp/Support/ binding-csharp/runtime/src/main/csharp/Org.Apache.Etch.Bindings.Csharp/Transport/ b...

Author: fitzner
Date: Sun May 15 21:07:41 2011
New Revision: 1103543

URL: http://svn.apache.org/viewvc?rev=1103543&view=rev
Log:
[ETCH-157]
binding-csharp: add udp transport support

Make udp and tcp protocols seamlessly interchangable in etch
The differences between the underlying protocols remain.

This patch comes from
Armin Mueller <mu...@itestra.com>
Aleksandar Kanchev <ka...@itestra.com>

Added:
    incubator/etch/trunk/binding-csharp/runtime/src/main/csharp/Org.Apache.Etch.Bindings.Csharp/Transport/UdpTransportFactory.cs
    incubator/etch/trunk/binding-csharp/runtime/src/main/csharp/Org.Apache.Etch.Bindings.Csharp/Util/UdpConnection.cs
    incubator/etch/trunk/binding-csharp/runtime/src/main/csharp/Org.Apache.Etch.Bindings.Csharp/Util/UdpListener.cs
    incubator/etch/trunk/binding-csharp/runtime/src/main/csharp/Org.Apache.Etch.Bindings.Csharp/Util/UdpPacket.cs
Modified:
    incubator/etch/trunk/binding-csharp/runtime/src/main/csharp/EtchProj.csproj
    incubator/etch/trunk/binding-csharp/runtime/src/main/csharp/Org.Apache.Etch.Bindings.Csharp/Support/TransportFactory.cs
    incubator/etch/trunk/examples/helloworld/src/main/csharp/org.apache.etch.examples.helloworld/MainHelloWorldClient.cs
    incubator/etch/trunk/examples/helloworld/src/main/csharp/org.apache.etch.examples.helloworld/MainHelloWorldListener.cs

Modified: incubator/etch/trunk/binding-csharp/runtime/src/main/csharp/EtchProj.csproj
URL: http://svn.apache.org/viewvc/incubator/etch/trunk/binding-csharp/runtime/src/main/csharp/EtchProj.csproj?rev=1103543&r1=1103542&r2=1103543&view=diff
==============================================================================
--- incubator/etch/trunk/binding-csharp/runtime/src/main/csharp/EtchProj.csproj (original)
+++ incubator/etch/trunk/binding-csharp/runtime/src/main/csharp/EtchProj.csproj Sun May 15 21:07:41 2011
@@ -127,6 +127,7 @@
     <Compile Include="Org.Apache.Etch.Bindings.Csharp\Transport\TaggedDataOutput.cs" />
     <Compile Include="Org.Apache.Etch.Bindings.Csharp\Transport\TcpTransportFactory.cs" />
     <Compile Include="Org.Apache.Etch.Bindings.Csharp\Transport\TransportMessage.cs" />
+    <Compile Include="Org.Apache.Etch.Bindings.Csharp\Transport\UdpTransportFactory.cs" />
     <Compile Include="Org.Apache.Etch.Bindings.Csharp\Transport\UnwantedMessage.cs" />
     <Compile Include="Org.Apache.Etch.Bindings.Csharp\Util\AbstractStartable.cs" />
     <Compile Include="Org.Apache.Etch.Bindings.Csharp\Util\AlarmListener.cs" />
@@ -180,6 +181,9 @@
     <Compile Include="Org.Apache.Etch.Bindings.Csharp\Util\TransportConsts.cs" />
     <Compile Include="Org.Apache.Etch.Bindings.Csharp\Util\TransportData.cs" />
     <Compile Include="Org.Apache.Etch.Bindings.Csharp\Util\TransportPacket.cs" />
+    <Compile Include="Org.Apache.Etch.Bindings.Csharp\Util\UdpConnection.cs" />
+    <Compile Include="Org.Apache.Etch.Bindings.Csharp\Util\UdpListener.cs" />
+    <Compile Include="Org.Apache.Etch.Bindings.Csharp\Util\UdpPacket.cs" />
     <Compile Include="Org.Apache.Etch.Bindings.Csharp\Util\URL.cs" />
     <Compile Include="Org.Apache.Etch.Bindings.Csharp\Util\URLSerializer.cs" />
     <Compile Include="Org.Apache.Etch.Bindings.Csharp\Util\Who.cs" />

Modified: incubator/etch/trunk/binding-csharp/runtime/src/main/csharp/Org.Apache.Etch.Bindings.Csharp/Support/TransportFactory.cs
URL: http://svn.apache.org/viewvc/incubator/etch/trunk/binding-csharp/runtime/src/main/csharp/Org.Apache.Etch.Bindings.Csharp/Support/TransportFactory.cs?rev=1103543&r1=1103542&r2=1103543&view=diff
==============================================================================
--- incubator/etch/trunk/binding-csharp/runtime/src/main/csharp/Org.Apache.Etch.Bindings.Csharp/Support/TransportFactory.cs (original)
+++ incubator/etch/trunk/binding-csharp/runtime/src/main/csharp/Org.Apache.Etch.Bindings.Csharp/Support/TransportFactory.cs Sun May 15 21:07:41 2011
@@ -226,6 +226,7 @@ namespace Org.Apache.Etch.Bindings.Cshar
         {
             Define("tcp", new TcpTransportFactory(false));
             Define("tls", new TcpTransportFactory(true));
+            Define("udp", new UdpTransportFactory());
 
             DefineFilter("KeepAlive", "Org.Apache.Etch.Bindings.Csharp.Transport.Filter.KeepAlive");
             DefineFilter("PwAuth", "Org.Apache.Etch.Bindings.Csharp.Transport.Filter.PwAuth");

Added: incubator/etch/trunk/binding-csharp/runtime/src/main/csharp/Org.Apache.Etch.Bindings.Csharp/Transport/UdpTransportFactory.cs
URL: http://svn.apache.org/viewvc/incubator/etch/trunk/binding-csharp/runtime/src/main/csharp/Org.Apache.Etch.Bindings.Csharp/Transport/UdpTransportFactory.cs?rev=1103543&view=auto
==============================================================================
--- incubator/etch/trunk/binding-csharp/runtime/src/main/csharp/Org.Apache.Etch.Bindings.Csharp/Transport/UdpTransportFactory.cs (added)
+++ incubator/etch/trunk/binding-csharp/runtime/src/main/csharp/Org.Apache.Etch.Bindings.Csharp/Transport/UdpTransportFactory.cs Sun May 15 21:07:41 2011
@@ -0,0 +1,152 @@
+// $Id$
+// 
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// 
+using System;
+using System.Net;
+using System.Net.Sockets;
+using Org.Apache.Etch.Bindings.Csharp.Msg;
+using Org.Apache.Etch.Bindings.Csharp.Support;
+using Org.Apache.Etch.Bindings.Csharp.Util;
+
+namespace Org.Apache.Etch.Bindings.Csharp.Transport
+{
+    public class UdpTransportFactory : TransportFactory
+    {
+        private const String UDP_LISTENER = "UdpTransportFactory.udpListener";
+        private const String SOCKET_ADDRESS = "UdpTransportFactory.socketAddress";
+
+        public UdpTransportFactory()
+        {
+        }
+
+        protected override TransportMessage NewTransport( string uri, Resources resources )
+        {
+            UdpListener udpListener = resources.Get( UDP_LISTENER ) as UdpListener;
+            IPEndPoint ipEndPoint = resources.Get(SOCKET_ADDRESS) as IPEndPoint;
+
+            URL url = new URL( uri );
+            TransportPacket transportPacket = null;
+
+            if (udpListener != null)
+                transportPacket = new UdpConnection( ipEndPoint, udpListener );
+            else
+                transportPacket = new UdpConnection( url );
+
+            TransportMessage transportMessage = new Messagizer( transportPacket, url, resources );
+
+            transportMessage = AddFilters( transportMessage, url, resources );
+
+            ValueFactory vf = (ValueFactory)resources.Get( TransportConsts.VALUE_FACTORY );
+            vf.LockDynamicTypes();
+
+            return transportMessage;
+        }
+
+        protected override Transport<ServerFactory> NewListener( string uri, Resources resources )
+        {
+            UdpListener transportListener = new UdpListener( uri, resources );
+            return new MySessionListener( this, transportListener, uri, resources );
+        }
+
+        public class MySessionListener : Transport<ServerFactory>, SessionListener<IPEndPoint>
+        {
+            private readonly UdpTransportFactory transportFactory;
+            private readonly UdpListener listener;
+            private readonly string uri;
+            private readonly Resources resources;
+
+            public MySessionListener( UdpTransportFactory transportFactory, UdpListener listener, String uri, Resources resources )
+            {
+                this.transportFactory = transportFactory;
+                this.listener = listener;
+                this.uri = uri;
+                this.resources = resources;
+
+                listener.SetSession( this );
+            }
+
+            public override string ToString()
+            {
+                return "UdpTransportFactory.MySessionListener/" + listener;
+            }
+
+            public void SessionAccepted( IPEndPoint ipEndPoint )
+            {
+                ValueFactory vf = session.NewValueFactory( uri );
+                Resources r = new Resources( resources );
+
+                r.Add( UDP_LISTENER, listener );
+                r.Add( SOCKET_ADDRESS, ipEndPoint);
+                r.Add( TransportConsts.VALUE_FACTORY, vf );
+
+                TransportMessage t = transportFactory.NewTransport( uri, r );
+
+                session.NewServer( t, uri, r );
+            }
+
+            #region Transport<ServerFactory> Members
+            public object TransportQuery( object query )
+            {
+                return listener.TransportQuery( query );
+            }
+
+            public void TransportControl( object control, object value )
+            {
+                listener.TransportControl( control, value );
+            }
+
+            public void TransportNotify( object eventObj )
+            {
+                listener.TransportNotify( eventObj );
+            }
+
+
+            private ServerFactory session;
+
+            public ServerFactory GetSession()
+            {
+                return session;
+            }
+
+            public void SetSession( ServerFactory session )
+            {
+                this.session = session;
+            }
+            #endregion
+
+            #region Session Members
+            public object SessionQuery( object query )
+            {
+                return session.SessionQuery( query );
+            }
+
+            public void SessionControl( object control, object value )
+            {
+                session.SessionControl( control, value );
+            }
+
+            public void SessionNotify( object eventObj )
+            {
+                session.SessionNotify( eventObj );
+            }
+            #endregion
+        }
+
+    }
+}

Added: incubator/etch/trunk/binding-csharp/runtime/src/main/csharp/Org.Apache.Etch.Bindings.Csharp/Util/UdpConnection.cs
URL: http://svn.apache.org/viewvc/incubator/etch/trunk/binding-csharp/runtime/src/main/csharp/Org.Apache.Etch.Bindings.Csharp/Util/UdpConnection.cs?rev=1103543&view=auto
==============================================================================
--- incubator/etch/trunk/binding-csharp/runtime/src/main/csharp/Org.Apache.Etch.Bindings.Csharp/Util/UdpConnection.cs (added)
+++ incubator/etch/trunk/binding-csharp/runtime/src/main/csharp/Org.Apache.Etch.Bindings.Csharp/Util/UdpConnection.cs Sun May 15 21:07:41 2011
@@ -0,0 +1,304 @@
+// $Id$
+// 
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// 
+using System;
+using System.Collections.Concurrent;
+using System.Net;
+using System.Net.Sockets;
+using System.Runtime.CompilerServices;
+
+namespace Org.Apache.Etch.Bindings.Csharp.Util
+{
+    public class UdpConnection : Connection<SessionPacket>, TransportPacket
+    {
+        public const string BROADCAST = "UdpConnection.broadcast";
+
+        private IPEndPoint remoteIpEndPoint;
+        private readonly UdpListener listener;
+        private BlockingCollection<UdpPacket> readQueue;
+
+        private readonly int delay;
+        private readonly bool enableBroadcast;
+        private readonly string host;
+        private readonly int port;
+
+        private UdpClient udpClient;
+
+        public UdpConnection( IPEndPoint remoteIpEndPoint, UdpListener listener )
+        {
+            this.remoteIpEndPoint = remoteIpEndPoint;
+            this.listener = listener;
+
+            readQueue = listener.allocReadQueue( remoteIpEndPoint );
+        }
+
+        public UdpConnection( string host, int? port, bool enableBroadcast, int delay )
+        {
+            if ( host == null )
+                throw new ArgumentNullException( "host is missing" );
+            else if ( host == "255.255.255.255" )
+                enableBroadcast = true;
+
+            if ( port == null )
+                throw new ArgumentNullException( "port" );
+            else if ( port <= 0 || port >= 65536 )
+                throw new ArgumentException( "port <= 0 || port >= 65536" );
+
+            this.host = host;
+            this.port = (int)port;
+            this.enableBroadcast = enableBroadcast;
+            this.delay = delay;
+
+            listener = null;
+        }
+
+        public UdpConnection( URL uri )
+            : this(uri.Host, uri.Port, uri.GetBooleanTerm( BROADCAST, false ), 0)
+        {
+            // nothing else.
+        }
+
+        public UdpConnection( string uri )
+            : this( new URL( uri ) )
+        {
+            // nothing else.
+        }
+
+        public override string ToString()
+        {
+            if (listener != null || udpClient != null)
+                return String.Format( "UdpConnection(up, {0}, {1})", LocalAddress(), RemoteAddress() );
+
+            return String.Format( "UdpConnection(down, {0}, {1})", host, port );
+        }
+
+        #region Connection<SessionData> Member
+        [MethodImpl( MethodImplOptions.Synchronized )]
+        protected override bool OpenSocket( bool reconnect )
+        {
+            // if a one time connection from a server socket listener, just
+            // return the existing socket. Bail if this is a reconnect.
+            if ( listener != null )
+            {
+                if ( !reconnect && readQueue == null )
+                    readQueue = listener.allocReadQueue( remoteIpEndPoint );
+
+                return !reconnect;
+            }
+
+            // we don't have an existing socket, and this is either the first
+            // connection attempt or a reconnect with delay > 0.
+            bool first = true;
+
+            while ( IsStarted() )
+            {
+                // if reconnect is false and first is true, this is our
+                // very first attempt to connect. otherwise, we are trying
+                // to reconnect a broken link or establish a link where we
+                // have already failed at least once.
+                if ( reconnect || !first )
+                {
+                    if ( delay == 0 )
+                        return false;
+
+                    System.Threading.Monitor.Wait( this, delay );
+
+                    if ( !IsStarted() )
+                        break;
+                }
+
+                // try to open a socket.
+                try
+                {
+                    udpClient = new UdpClient( host, port );
+                    return true;
+                }
+                catch ( Exception e )
+                {
+                    if ( first )
+                    {
+                        first = false;
+                        FireException( "open", e );
+                    }
+                }
+            }
+
+            return false;
+        }
+
+        protected override void SetUpSocket()
+        {
+            if ( udpClient != null )
+            {
+                udpClient.EnableBroadcast = enableBroadcast;
+                udpClient.DontFragment = true;
+            }
+        }
+
+        protected override void ReadSocket()
+        {
+            try
+            {
+                while ( IsStarted() )
+                {
+                    IPEndPoint senderEndPoint = remoteIpEndPoint;
+                    byte[] receiveBytes = null;
+
+                    if ( readQueue != null )
+                    {
+                        UdpPacket packet = readQueue.Take();
+
+                        senderEndPoint = packet.IPEndPoint;
+                        receiveBytes = packet.Bytes;
+                    }
+                    else
+                        receiveBytes = udpClient.Receive( ref senderEndPoint );
+
+                    WhoSender sender = new WhoSender( senderEndPoint );
+                    FlexBuffer receiveBuf = new FlexBuffer( receiveBytes );
+
+                    session.SessionPacket( sender, receiveBuf );
+                }
+            }
+            catch ( ArgumentNullException )
+            {
+                // ignore
+            }
+            catch ( SocketException ex )
+            {
+                if ( ex.SocketErrorCode != SocketError.Interrupted )
+                    throw ex;
+            }
+        }
+
+        protected override void Stop0()
+        {
+            try
+            {
+                Close( false );
+            }
+            catch
+            {
+                // ignore
+            }
+            base.Stop0();
+        }
+
+        public override void Close( bool reset )
+        {
+            if ( listener != null )
+            {
+                listener.releaseReadQueue( remoteIpEndPoint );
+                readQueue = null;
+            }
+            else if ( udpClient != null )
+            {
+                udpClient.Close();
+                udpClient = null;
+            }
+        }
+
+        public override EndPoint LocalAddress()
+        {
+            if ( listener != null )
+                return listener.LocalEndPoint;
+
+            return udpClient.Client.LocalEndPoint;
+        }
+
+        public override EndPoint RemoteAddress()
+        {
+            if (listener != null)
+                return remoteIpEndPoint;
+
+            return udpClient.Client.RemoteEndPoint;
+        }
+        #endregion
+
+        #region TransportPacket Member
+        public int HeaderSize()
+        {
+            return 0;
+        }
+
+        public void TransportPacket( Who recipient, FlexBuffer buf )
+        {
+            byte[] sendBytes = buf.GetAvailBytes();
+
+            if ( listener != null )
+            {
+                IPEndPoint ipe = remoteIpEndPoint;
+
+                if ( recipient != null )
+                {
+                    if (!(recipient is WhoSender))
+                        throw new Exception( "unknown recipient" );
+                    ipe = ( recipient as WhoSender ).sender;
+                }
+
+                if (ipe == null)
+                    throw new Exception( "unknown receiver" );
+
+                listener.Send( sendBytes, sendBytes.Length, ipe );
+            }
+            else
+                udpClient.Send( sendBytes, sendBytes.Length );
+        }
+
+        public override object TransportQuery( object query )
+        {
+            if ( query.Equals( TransportConsts.IS_SERVER ) )
+                return listener != null;
+
+            return base.TransportQuery( query );
+        }
+        #endregion
+
+        private class WhoSender : Who
+        {
+            public IPEndPoint sender;
+
+            public WhoSender( IPEndPoint sender )
+            {
+                this.sender = sender;
+            }
+
+            public override int GetHashCode()
+            {
+                return sender.GetHashCode();
+            }
+
+            public override bool Equals( object obj )
+            {
+                WhoSender whoObj = obj as WhoSender;
+
+                if ( whoObj != null )
+                    return sender.Equals( whoObj.sender );
+
+                return base.Equals( obj );
+            }
+
+            public override string ToString()
+            {
+                return "WhoSender(" + sender + ")";
+            }
+        }
+
+    }
+}

Added: incubator/etch/trunk/binding-csharp/runtime/src/main/csharp/Org.Apache.Etch.Bindings.Csharp/Util/UdpListener.cs
URL: http://svn.apache.org/viewvc/incubator/etch/trunk/binding-csharp/runtime/src/main/csharp/Org.Apache.Etch.Bindings.Csharp/Util/UdpListener.cs?rev=1103543&view=auto
==============================================================================
--- incubator/etch/trunk/binding-csharp/runtime/src/main/csharp/Org.Apache.Etch.Bindings.Csharp/Util/UdpListener.cs (added)
+++ incubator/etch/trunk/binding-csharp/runtime/src/main/csharp/Org.Apache.Etch.Bindings.Csharp/Util/UdpListener.cs Sun May 15 21:07:41 2011
@@ -0,0 +1,298 @@
+// $Id$
+// 
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// 
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.IO;
+using System.Linq;
+using System.Net;
+using System.Net.Sockets;
+using System.Runtime.CompilerServices;
+
+namespace Org.Apache.Etch.Bindings.Csharp.Util
+{
+    /// <summary>
+    /// Implementation of a connection which handles a socket listener.
+    /// </summary>
+    public class UdpListener : Connection<SessionListener<IPEndPoint>>
+    {
+        /// <summary>
+        /// Query term from URI to specify queueSize value to UdpConnection. The
+        /// value is "UdpListener.queueSize".
+        /// </summary>
+        public const String QUEUE_SIZE = "UdpListener.queueSize";
+        public const string REUSE_PORT = "UdpListener.reusePort";
+
+        private readonly string host;
+        private readonly int port;
+        private readonly bool reusePort;
+        private readonly int delay;
+        private readonly int queueSize;
+
+        private UdpClient udpClient;
+        private IDictionary<IPEndPoint, BlockingCollection<UdpPacket>> readQueues;
+
+        public EndPoint LocalEndPoint { get { return udpClient.Client.LocalEndPoint; } }
+
+        /// <summary>
+        /// Constructs the UdpListener.
+        /// </summary>
+        /// <param name="host">address to listen to. Null means any local address.</param>
+        /// <param name="port">port to listen on. Port must be >= 0. Port of 0
+        /// means allocate an available port.</param>
+        /// <param name="reusePort">pass reuse port flag to the socket.</param>
+        /// <param name="delay">how long in milliseconds to wait before retrying a
+        /// failure. Delay must be >= 0. Delay of 0 means do not retry.</param>
+        /// <param name="queueSize">udp connection packet queue size.</param>
+        public UdpListener( string host, int port, bool reusePort, int delay, int queueSize )
+        {
+            if ( port < 0 || port > 65535 )
+                throw new ArgumentException( "port < 0 || port > 65535" );
+
+            if ( delay < 0 )
+                throw new ArgumentException( "delay < 0" );
+
+            if ( queueSize <= 0 )
+                throw new ArgumentException( "queueSize < 0" );
+
+            this.host = host;
+            this.port = port;
+            this.reusePort = reusePort;
+            this.delay = delay;
+            this.queueSize = queueSize;
+
+            readQueues = new Dictionary<IPEndPoint, BlockingCollection<UdpPacket>>();
+        }
+
+        public UdpListener( URL uri, Resources resources )
+            : this( TranslateHost( uri.Host ),
+            uri.Port != null ? uri.Port.Value : 0,
+            (bool)uri.GetBooleanTerm( REUSE_PORT, false ),
+            0,
+            (int)uri.GetIntegerTerm( QUEUE_SIZE, 15 ) )
+        {
+            // nothing else.
+        }
+
+        /// <summary>
+        /// Constructs a UdpListener, initialized from the URI. Listens on the host
+        /// and port specified in the URI. To listen on all interfaces, specify
+        /// host as ALL_INTFS ("0.0.0.0"). If port is specified or defaulted to 0,
+        /// an unused port will be selected.
+        /// </summary>
+        /// <param name="uri"></param>
+        public UdpListener( string uri, Resources resources )
+            : this( new URL( uri ), resources )
+        {
+            // nothing else.
+        }
+
+        [MethodImpl( MethodImplOptions.Synchronized )]
+        public int Send( byte[] dgram, int bytes, IPEndPoint endPoint )
+        {
+            if ( udpClient != null )
+                return udpClient.Send( dgram, bytes, endPoint );
+
+            return -1;
+        }
+
+        public BlockingCollection<UdpPacket> allocReadQueue( IPEndPoint endPoint )
+        {
+            lock ( readQueues )
+            {
+                if ( !readQueues.ContainsKey( endPoint ) )
+                    readQueues.Add( endPoint, new BlockingCollection<UdpPacket>( queueSize ) );
+
+                return readQueues[ endPoint ];
+            }
+        }
+
+        private BlockingCollection<UdpPacket> getReadQueue( IPEndPoint endPoint )
+        {
+            lock ( readQueues )
+            {
+                if ( readQueues.ContainsKey( endPoint ) )
+                    return readQueues[ endPoint ];
+            }
+
+            return null;
+        }
+
+        public void releaseReadQueue( IPEndPoint endPoint )
+        {
+            BlockingCollection<UdpPacket> readQueue = null;
+
+            lock ( readQueues )
+            {
+                if ( readQueues.ContainsKey( endPoint ) )
+                {
+                    readQueue = readQueues[ endPoint ];
+                    readQueues.Remove( endPoint );
+                }
+            }
+
+            if ( readQueue != null )
+            {
+                readQueue.CompleteAdding();
+                readQueue.Dispose();
+            }
+        }
+
+        #region Connection<SessionListener<IPEndPoint>> Member
+        [MethodImpl( MethodImplOptions.Synchronized )]
+        protected override bool OpenSocket( bool reconnect )
+        {
+            bool first = true;
+
+            while ( IsStarted() )
+            {
+                if ( reconnect || !first )
+                {
+                    if ( delay == 0 )
+                        return false;
+
+                    System.Threading.Monitor.Wait( this, delay );
+
+                    if ( !IsStarted() )
+                        break;
+                }
+
+                try
+                {
+                    IPAddress addr;
+                    if ( host != null )
+                    {
+                        IPAddress[] addrs = Dns.GetHostAddresses( host );
+                        if ( addrs == null || addrs.Length == 0 )
+                            throw new ArgumentException( "host is invalid" );
+                        addr = addrs[ 0 ];
+                    }
+                    else
+                    {
+                        addr = IPAddress.Any;
+                    }
+
+                    IPEndPoint ipe = new IPEndPoint( addr, port );
+                    udpClient = new UdpClient();
+                    udpClient.Client.SetSocketOption( SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, reusePort );
+                    try
+                    {
+                        udpClient.Client.Bind( ipe );
+                    }
+                    catch ( SocketException e )
+                    {
+                        throw new IOException( "Could not bind address " + host + ":" + port, e );
+                    }
+
+                    return true;
+                }
+                catch ( Exception e )
+                {
+                    if ( first )
+                    {
+                        first = false;
+                        FireException( "open", e );
+                    }
+                }
+            }
+            return false;
+        }
+
+        protected override void SetUpSocket()
+        {
+            udpClient.DontFragment = true;
+        }
+
+        protected override void ReadSocket()
+        {
+            while ( IsStarted() )
+            {
+                IPEndPoint remoteEP = null;
+                byte[] receiveBytes;
+
+                try
+                {
+                    receiveBytes = udpClient.Receive(ref remoteEP);
+
+                    BlockingCollection<UdpPacket> readQueue = getReadQueue( remoteEP );
+
+                    if ( readQueue == null )
+                        session.SessionAccepted( remoteEP );
+
+                    readQueue = getReadQueue( remoteEP );
+                    if ( readQueue != null )
+                        readQueue.TryAdd( new UdpPacket( remoteEP, receiveBytes ) );
+                }
+                catch ( SocketException ex )
+                {
+                    if ( ex.SocketErrorCode != SocketError.ConnectionReset )
+                        throw;
+                }
+                catch ( Exception e )
+                {
+                    FireException( "accepted", e );
+                }
+            }
+        }
+
+        protected override void Stop0()
+        {
+            try
+            {
+                Close( false );
+            }
+            catch
+            {
+                // ignore
+            }
+            base.Stop0();
+        }
+
+        public override void Close( bool reset )
+        {
+            lock ( readQueues )
+            {
+                foreach ( IPEndPoint ipEndPoint in readQueues.Keys.ToArray() )
+                    releaseReadQueue( ipEndPoint );
+            }
+
+            if ( udpClient != null )
+            {
+                udpClient.Close();
+                udpClient = null;
+            }
+        }
+
+        public override EndPoint LocalAddress()
+        {
+            if ( udpClient != null )
+                return udpClient.Client.LocalEndPoint;
+
+            return null;
+        }
+
+        public override EndPoint RemoteAddress()
+        {
+            // ignore
+            return null;
+        }
+        #endregion
+    }
+}

Added: incubator/etch/trunk/binding-csharp/runtime/src/main/csharp/Org.Apache.Etch.Bindings.Csharp/Util/UdpPacket.cs
URL: http://svn.apache.org/viewvc/incubator/etch/trunk/binding-csharp/runtime/src/main/csharp/Org.Apache.Etch.Bindings.Csharp/Util/UdpPacket.cs?rev=1103543&view=auto
==============================================================================
--- incubator/etch/trunk/binding-csharp/runtime/src/main/csharp/Org.Apache.Etch.Bindings.Csharp/Util/UdpPacket.cs (added)
+++ incubator/etch/trunk/binding-csharp/runtime/src/main/csharp/Org.Apache.Etch.Bindings.Csharp/Util/UdpPacket.cs Sun May 15 21:07:41 2011
@@ -0,0 +1,35 @@
+// $Id$
+// 
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// 
+using System.Net;
+
+namespace Org.Apache.Etch.Bindings.Csharp.Util
+{
+    public class UdpPacket
+    {
+        public IPEndPoint IPEndPoint { get; protected set; }
+        public byte[] Bytes { get; protected set; }
+
+        public UdpPacket( IPEndPoint ipEndPoint, byte[] bytes )
+        {
+            this.IPEndPoint = ipEndPoint;
+            this.Bytes = bytes;
+        }
+    }
+}

Modified: incubator/etch/trunk/examples/helloworld/src/main/csharp/org.apache.etch.examples.helloworld/MainHelloWorldClient.cs
URL: http://svn.apache.org/viewvc/incubator/etch/trunk/examples/helloworld/src/main/csharp/org.apache.etch.examples.helloworld/MainHelloWorldClient.cs?rev=1103543&r1=1103542&r2=1103543&view=diff
==============================================================================
--- incubator/etch/trunk/examples/helloworld/src/main/csharp/org.apache.etch.examples.helloworld/MainHelloWorldClient.cs (original)
+++ incubator/etch/trunk/examples/helloworld/src/main/csharp/org.apache.etch.examples.helloworld/MainHelloWorldClient.cs Sun May 15 21:07:41 2011
@@ -30,6 +30,7 @@ namespace org.apache.etch.examples.hello
 		public static void Main(String[] args)
 		{
 			// TODO: Change to correct URI
+			//string uri = "udp://127.0.0.1:4001";
 			string uri = "tcp://127.0.0.1:4001";
 			
 			RemoteHelloWorldServer server = HelloWorldHelper.NewServer( uri, null, new MainHelloWorldClient());

Modified: incubator/etch/trunk/examples/helloworld/src/main/csharp/org.apache.etch.examples.helloworld/MainHelloWorldListener.cs
URL: http://svn.apache.org/viewvc/incubator/etch/trunk/examples/helloworld/src/main/csharp/org.apache.etch.examples.helloworld/MainHelloWorldListener.cs?rev=1103543&r1=1103542&r2=1103543&view=diff
==============================================================================
--- incubator/etch/trunk/examples/helloworld/src/main/csharp/org.apache.etch.examples.helloworld/MainHelloWorldListener.cs (original)
+++ incubator/etch/trunk/examples/helloworld/src/main/csharp/org.apache.etch.examples.helloworld/MainHelloWorldListener.cs Sun May 15 21:07:41 2011
@@ -32,6 +32,7 @@ namespace org.apache.etch.examples.hello
 		public static void Main(String[] args)
 		{
 			// TODO: Change to correct URI
+			//string uri = "udp://127.0.0.1:4001";
 			string uri = "tcp://127.0.0.1:4001";
 			
 			ServerFactory listener = HelloWorldHelper.NewListener( uri, null, new MainHelloWorldListener());