You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@etch.apache.org by fi...@apache.org on 2011/05/15 23:07:42 UTC
svn commit: r1103543 - in /incubator/etch/trunk:
binding-csharp/runtime/src/main/csharp/
binding-csharp/runtime/src/main/csharp/Org.Apache.Etch.Bindings.Csharp/Support/
binding-csharp/runtime/src/main/csharp/Org.Apache.Etch.Bindings.Csharp/Transport/
b...
Author: fitzner
Date: Sun May 15 21:07:41 2011
New Revision: 1103543
URL: http://svn.apache.org/viewvc?rev=1103543&view=rev
Log:
[ETCH-157]
binding-csharp: add udp transport support
Make udp and tcp protocols seamlessly interchangable in etch
The differences between the underlying protocols remain.
This patch comes from
Armin Mueller <mu...@itestra.com>
Aleksandar Kanchev <ka...@itestra.com>
Added:
incubator/etch/trunk/binding-csharp/runtime/src/main/csharp/Org.Apache.Etch.Bindings.Csharp/Transport/UdpTransportFactory.cs
incubator/etch/trunk/binding-csharp/runtime/src/main/csharp/Org.Apache.Etch.Bindings.Csharp/Util/UdpConnection.cs
incubator/etch/trunk/binding-csharp/runtime/src/main/csharp/Org.Apache.Etch.Bindings.Csharp/Util/UdpListener.cs
incubator/etch/trunk/binding-csharp/runtime/src/main/csharp/Org.Apache.Etch.Bindings.Csharp/Util/UdpPacket.cs
Modified:
incubator/etch/trunk/binding-csharp/runtime/src/main/csharp/EtchProj.csproj
incubator/etch/trunk/binding-csharp/runtime/src/main/csharp/Org.Apache.Etch.Bindings.Csharp/Support/TransportFactory.cs
incubator/etch/trunk/examples/helloworld/src/main/csharp/org.apache.etch.examples.helloworld/MainHelloWorldClient.cs
incubator/etch/trunk/examples/helloworld/src/main/csharp/org.apache.etch.examples.helloworld/MainHelloWorldListener.cs
Modified: incubator/etch/trunk/binding-csharp/runtime/src/main/csharp/EtchProj.csproj
URL: http://svn.apache.org/viewvc/incubator/etch/trunk/binding-csharp/runtime/src/main/csharp/EtchProj.csproj?rev=1103543&r1=1103542&r2=1103543&view=diff
==============================================================================
--- incubator/etch/trunk/binding-csharp/runtime/src/main/csharp/EtchProj.csproj (original)
+++ incubator/etch/trunk/binding-csharp/runtime/src/main/csharp/EtchProj.csproj Sun May 15 21:07:41 2011
@@ -127,6 +127,7 @@
<Compile Include="Org.Apache.Etch.Bindings.Csharp\Transport\TaggedDataOutput.cs" />
<Compile Include="Org.Apache.Etch.Bindings.Csharp\Transport\TcpTransportFactory.cs" />
<Compile Include="Org.Apache.Etch.Bindings.Csharp\Transport\TransportMessage.cs" />
+ <Compile Include="Org.Apache.Etch.Bindings.Csharp\Transport\UdpTransportFactory.cs" />
<Compile Include="Org.Apache.Etch.Bindings.Csharp\Transport\UnwantedMessage.cs" />
<Compile Include="Org.Apache.Etch.Bindings.Csharp\Util\AbstractStartable.cs" />
<Compile Include="Org.Apache.Etch.Bindings.Csharp\Util\AlarmListener.cs" />
@@ -180,6 +181,9 @@
<Compile Include="Org.Apache.Etch.Bindings.Csharp\Util\TransportConsts.cs" />
<Compile Include="Org.Apache.Etch.Bindings.Csharp\Util\TransportData.cs" />
<Compile Include="Org.Apache.Etch.Bindings.Csharp\Util\TransportPacket.cs" />
+ <Compile Include="Org.Apache.Etch.Bindings.Csharp\Util\UdpConnection.cs" />
+ <Compile Include="Org.Apache.Etch.Bindings.Csharp\Util\UdpListener.cs" />
+ <Compile Include="Org.Apache.Etch.Bindings.Csharp\Util\UdpPacket.cs" />
<Compile Include="Org.Apache.Etch.Bindings.Csharp\Util\URL.cs" />
<Compile Include="Org.Apache.Etch.Bindings.Csharp\Util\URLSerializer.cs" />
<Compile Include="Org.Apache.Etch.Bindings.Csharp\Util\Who.cs" />
Modified: incubator/etch/trunk/binding-csharp/runtime/src/main/csharp/Org.Apache.Etch.Bindings.Csharp/Support/TransportFactory.cs
URL: http://svn.apache.org/viewvc/incubator/etch/trunk/binding-csharp/runtime/src/main/csharp/Org.Apache.Etch.Bindings.Csharp/Support/TransportFactory.cs?rev=1103543&r1=1103542&r2=1103543&view=diff
==============================================================================
--- incubator/etch/trunk/binding-csharp/runtime/src/main/csharp/Org.Apache.Etch.Bindings.Csharp/Support/TransportFactory.cs (original)
+++ incubator/etch/trunk/binding-csharp/runtime/src/main/csharp/Org.Apache.Etch.Bindings.Csharp/Support/TransportFactory.cs Sun May 15 21:07:41 2011
@@ -226,6 +226,7 @@ namespace Org.Apache.Etch.Bindings.Cshar
{
Define("tcp", new TcpTransportFactory(false));
Define("tls", new TcpTransportFactory(true));
+ Define("udp", new UdpTransportFactory());
DefineFilter("KeepAlive", "Org.Apache.Etch.Bindings.Csharp.Transport.Filter.KeepAlive");
DefineFilter("PwAuth", "Org.Apache.Etch.Bindings.Csharp.Transport.Filter.PwAuth");
Added: incubator/etch/trunk/binding-csharp/runtime/src/main/csharp/Org.Apache.Etch.Bindings.Csharp/Transport/UdpTransportFactory.cs
URL: http://svn.apache.org/viewvc/incubator/etch/trunk/binding-csharp/runtime/src/main/csharp/Org.Apache.Etch.Bindings.Csharp/Transport/UdpTransportFactory.cs?rev=1103543&view=auto
==============================================================================
--- incubator/etch/trunk/binding-csharp/runtime/src/main/csharp/Org.Apache.Etch.Bindings.Csharp/Transport/UdpTransportFactory.cs (added)
+++ incubator/etch/trunk/binding-csharp/runtime/src/main/csharp/Org.Apache.Etch.Bindings.Csharp/Transport/UdpTransportFactory.cs Sun May 15 21:07:41 2011
@@ -0,0 +1,152 @@
+// $Id$
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+using System;
+using System.Net;
+using System.Net.Sockets;
+using Org.Apache.Etch.Bindings.Csharp.Msg;
+using Org.Apache.Etch.Bindings.Csharp.Support;
+using Org.Apache.Etch.Bindings.Csharp.Util;
+
+namespace Org.Apache.Etch.Bindings.Csharp.Transport
+{
+ public class UdpTransportFactory : TransportFactory
+ {
+ private const String UDP_LISTENER = "UdpTransportFactory.udpListener";
+ private const String SOCKET_ADDRESS = "UdpTransportFactory.socketAddress";
+
+ public UdpTransportFactory()
+ {
+ }
+
+ protected override TransportMessage NewTransport( string uri, Resources resources )
+ {
+ UdpListener udpListener = resources.Get( UDP_LISTENER ) as UdpListener;
+ IPEndPoint ipEndPoint = resources.Get(SOCKET_ADDRESS) as IPEndPoint;
+
+ URL url = new URL( uri );
+ TransportPacket transportPacket = null;
+
+ if (udpListener != null)
+ transportPacket = new UdpConnection( ipEndPoint, udpListener );
+ else
+ transportPacket = new UdpConnection( url );
+
+ TransportMessage transportMessage = new Messagizer( transportPacket, url, resources );
+
+ transportMessage = AddFilters( transportMessage, url, resources );
+
+ ValueFactory vf = (ValueFactory)resources.Get( TransportConsts.VALUE_FACTORY );
+ vf.LockDynamicTypes();
+
+ return transportMessage;
+ }
+
+ protected override Transport<ServerFactory> NewListener( string uri, Resources resources )
+ {
+ UdpListener transportListener = new UdpListener( uri, resources );
+ return new MySessionListener( this, transportListener, uri, resources );
+ }
+
+ public class MySessionListener : Transport<ServerFactory>, SessionListener<IPEndPoint>
+ {
+ private readonly UdpTransportFactory transportFactory;
+ private readonly UdpListener listener;
+ private readonly string uri;
+ private readonly Resources resources;
+
+ public MySessionListener( UdpTransportFactory transportFactory, UdpListener listener, String uri, Resources resources )
+ {
+ this.transportFactory = transportFactory;
+ this.listener = listener;
+ this.uri = uri;
+ this.resources = resources;
+
+ listener.SetSession( this );
+ }
+
+ public override string ToString()
+ {
+ return "UdpTransportFactory.MySessionListener/" + listener;
+ }
+
+ public void SessionAccepted( IPEndPoint ipEndPoint )
+ {
+ ValueFactory vf = session.NewValueFactory( uri );
+ Resources r = new Resources( resources );
+
+ r.Add( UDP_LISTENER, listener );
+ r.Add( SOCKET_ADDRESS, ipEndPoint);
+ r.Add( TransportConsts.VALUE_FACTORY, vf );
+
+ TransportMessage t = transportFactory.NewTransport( uri, r );
+
+ session.NewServer( t, uri, r );
+ }
+
+ #region Transport<ServerFactory> Members
+ public object TransportQuery( object query )
+ {
+ return listener.TransportQuery( query );
+ }
+
+ public void TransportControl( object control, object value )
+ {
+ listener.TransportControl( control, value );
+ }
+
+ public void TransportNotify( object eventObj )
+ {
+ listener.TransportNotify( eventObj );
+ }
+
+
+ private ServerFactory session;
+
+ public ServerFactory GetSession()
+ {
+ return session;
+ }
+
+ public void SetSession( ServerFactory session )
+ {
+ this.session = session;
+ }
+ #endregion
+
+ #region Session Members
+ public object SessionQuery( object query )
+ {
+ return session.SessionQuery( query );
+ }
+
+ public void SessionControl( object control, object value )
+ {
+ session.SessionControl( control, value );
+ }
+
+ public void SessionNotify( object eventObj )
+ {
+ session.SessionNotify( eventObj );
+ }
+ #endregion
+ }
+
+ }
+}
Added: incubator/etch/trunk/binding-csharp/runtime/src/main/csharp/Org.Apache.Etch.Bindings.Csharp/Util/UdpConnection.cs
URL: http://svn.apache.org/viewvc/incubator/etch/trunk/binding-csharp/runtime/src/main/csharp/Org.Apache.Etch.Bindings.Csharp/Util/UdpConnection.cs?rev=1103543&view=auto
==============================================================================
--- incubator/etch/trunk/binding-csharp/runtime/src/main/csharp/Org.Apache.Etch.Bindings.Csharp/Util/UdpConnection.cs (added)
+++ incubator/etch/trunk/binding-csharp/runtime/src/main/csharp/Org.Apache.Etch.Bindings.Csharp/Util/UdpConnection.cs Sun May 15 21:07:41 2011
@@ -0,0 +1,304 @@
+// $Id$
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+using System;
+using System.Collections.Concurrent;
+using System.Net;
+using System.Net.Sockets;
+using System.Runtime.CompilerServices;
+
+namespace Org.Apache.Etch.Bindings.Csharp.Util
+{
+ public class UdpConnection : Connection<SessionPacket>, TransportPacket
+ {
+ public const string BROADCAST = "UdpConnection.broadcast";
+
+ private IPEndPoint remoteIpEndPoint;
+ private readonly UdpListener listener;
+ private BlockingCollection<UdpPacket> readQueue;
+
+ private readonly int delay;
+ private readonly bool enableBroadcast;
+ private readonly string host;
+ private readonly int port;
+
+ private UdpClient udpClient;
+
+ public UdpConnection( IPEndPoint remoteIpEndPoint, UdpListener listener )
+ {
+ this.remoteIpEndPoint = remoteIpEndPoint;
+ this.listener = listener;
+
+ readQueue = listener.allocReadQueue( remoteIpEndPoint );
+ }
+
+ public UdpConnection( string host, int? port, bool enableBroadcast, int delay )
+ {
+ if ( host == null )
+ throw new ArgumentNullException( "host is missing" );
+ else if ( host == "255.255.255.255" )
+ enableBroadcast = true;
+
+ if ( port == null )
+ throw new ArgumentNullException( "port" );
+ else if ( port <= 0 || port >= 65536 )
+ throw new ArgumentException( "port <= 0 || port >= 65536" );
+
+ this.host = host;
+ this.port = (int)port;
+ this.enableBroadcast = enableBroadcast;
+ this.delay = delay;
+
+ listener = null;
+ }
+
+ public UdpConnection( URL uri )
+ : this(uri.Host, uri.Port, uri.GetBooleanTerm( BROADCAST, false ), 0)
+ {
+ // nothing else.
+ }
+
+ public UdpConnection( string uri )
+ : this( new URL( uri ) )
+ {
+ // nothing else.
+ }
+
+ public override string ToString()
+ {
+ if (listener != null || udpClient != null)
+ return String.Format( "UdpConnection(up, {0}, {1})", LocalAddress(), RemoteAddress() );
+
+ return String.Format( "UdpConnection(down, {0}, {1})", host, port );
+ }
+
+ #region Connection<SessionData> Member
+ [MethodImpl( MethodImplOptions.Synchronized )]
+ protected override bool OpenSocket( bool reconnect )
+ {
+ // if a one time connection from a server socket listener, just
+ // return the existing socket. Bail if this is a reconnect.
+ if ( listener != null )
+ {
+ if ( !reconnect && readQueue == null )
+ readQueue = listener.allocReadQueue( remoteIpEndPoint );
+
+ return !reconnect;
+ }
+
+ // we don't have an existing socket, and this is either the first
+ // connection attempt or a reconnect with delay > 0.
+ bool first = true;
+
+ while ( IsStarted() )
+ {
+ // if reconnect is false and first is true, this is our
+ // very first attempt to connect. otherwise, we are trying
+ // to reconnect a broken link or establish a link where we
+ // have already failed at least once.
+ if ( reconnect || !first )
+ {
+ if ( delay == 0 )
+ return false;
+
+ System.Threading.Monitor.Wait( this, delay );
+
+ if ( !IsStarted() )
+ break;
+ }
+
+ // try to open a socket.
+ try
+ {
+ udpClient = new UdpClient( host, port );
+ return true;
+ }
+ catch ( Exception e )
+ {
+ if ( first )
+ {
+ first = false;
+ FireException( "open", e );
+ }
+ }
+ }
+
+ return false;
+ }
+
+ protected override void SetUpSocket()
+ {
+ if ( udpClient != null )
+ {
+ udpClient.EnableBroadcast = enableBroadcast;
+ udpClient.DontFragment = true;
+ }
+ }
+
+ protected override void ReadSocket()
+ {
+ try
+ {
+ while ( IsStarted() )
+ {
+ IPEndPoint senderEndPoint = remoteIpEndPoint;
+ byte[] receiveBytes = null;
+
+ if ( readQueue != null )
+ {
+ UdpPacket packet = readQueue.Take();
+
+ senderEndPoint = packet.IPEndPoint;
+ receiveBytes = packet.Bytes;
+ }
+ else
+ receiveBytes = udpClient.Receive( ref senderEndPoint );
+
+ WhoSender sender = new WhoSender( senderEndPoint );
+ FlexBuffer receiveBuf = new FlexBuffer( receiveBytes );
+
+ session.SessionPacket( sender, receiveBuf );
+ }
+ }
+ catch ( ArgumentNullException )
+ {
+ // ignore
+ }
+ catch ( SocketException ex )
+ {
+ if ( ex.SocketErrorCode != SocketError.Interrupted )
+ throw ex;
+ }
+ }
+
+ protected override void Stop0()
+ {
+ try
+ {
+ Close( false );
+ }
+ catch
+ {
+ // ignore
+ }
+ base.Stop0();
+ }
+
+ public override void Close( bool reset )
+ {
+ if ( listener != null )
+ {
+ listener.releaseReadQueue( remoteIpEndPoint );
+ readQueue = null;
+ }
+ else if ( udpClient != null )
+ {
+ udpClient.Close();
+ udpClient = null;
+ }
+ }
+
+ public override EndPoint LocalAddress()
+ {
+ if ( listener != null )
+ return listener.LocalEndPoint;
+
+ return udpClient.Client.LocalEndPoint;
+ }
+
+ public override EndPoint RemoteAddress()
+ {
+ if (listener != null)
+ return remoteIpEndPoint;
+
+ return udpClient.Client.RemoteEndPoint;
+ }
+ #endregion
+
+ #region TransportPacket Member
+ public int HeaderSize()
+ {
+ return 0;
+ }
+
+ public void TransportPacket( Who recipient, FlexBuffer buf )
+ {
+ byte[] sendBytes = buf.GetAvailBytes();
+
+ if ( listener != null )
+ {
+ IPEndPoint ipe = remoteIpEndPoint;
+
+ if ( recipient != null )
+ {
+ if (!(recipient is WhoSender))
+ throw new Exception( "unknown recipient" );
+ ipe = ( recipient as WhoSender ).sender;
+ }
+
+ if (ipe == null)
+ throw new Exception( "unknown receiver" );
+
+ listener.Send( sendBytes, sendBytes.Length, ipe );
+ }
+ else
+ udpClient.Send( sendBytes, sendBytes.Length );
+ }
+
+ public override object TransportQuery( object query )
+ {
+ if ( query.Equals( TransportConsts.IS_SERVER ) )
+ return listener != null;
+
+ return base.TransportQuery( query );
+ }
+ #endregion
+
+ private class WhoSender : Who
+ {
+ public IPEndPoint sender;
+
+ public WhoSender( IPEndPoint sender )
+ {
+ this.sender = sender;
+ }
+
+ public override int GetHashCode()
+ {
+ return sender.GetHashCode();
+ }
+
+ public override bool Equals( object obj )
+ {
+ WhoSender whoObj = obj as WhoSender;
+
+ if ( whoObj != null )
+ return sender.Equals( whoObj.sender );
+
+ return base.Equals( obj );
+ }
+
+ public override string ToString()
+ {
+ return "WhoSender(" + sender + ")";
+ }
+ }
+
+ }
+}
Added: incubator/etch/trunk/binding-csharp/runtime/src/main/csharp/Org.Apache.Etch.Bindings.Csharp/Util/UdpListener.cs
URL: http://svn.apache.org/viewvc/incubator/etch/trunk/binding-csharp/runtime/src/main/csharp/Org.Apache.Etch.Bindings.Csharp/Util/UdpListener.cs?rev=1103543&view=auto
==============================================================================
--- incubator/etch/trunk/binding-csharp/runtime/src/main/csharp/Org.Apache.Etch.Bindings.Csharp/Util/UdpListener.cs (added)
+++ incubator/etch/trunk/binding-csharp/runtime/src/main/csharp/Org.Apache.Etch.Bindings.Csharp/Util/UdpListener.cs Sun May 15 21:07:41 2011
@@ -0,0 +1,298 @@
+// $Id$
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.IO;
+using System.Linq;
+using System.Net;
+using System.Net.Sockets;
+using System.Runtime.CompilerServices;
+
+namespace Org.Apache.Etch.Bindings.Csharp.Util
+{
+ /// <summary>
+ /// Implementation of a connection which handles a socket listener.
+ /// </summary>
+ public class UdpListener : Connection<SessionListener<IPEndPoint>>
+ {
+ /// <summary>
+ /// Query term from URI to specify queueSize value to UdpConnection. The
+ /// value is "UdpListener.queueSize".
+ /// </summary>
+ public const String QUEUE_SIZE = "UdpListener.queueSize";
+ public const string REUSE_PORT = "UdpListener.reusePort";
+
+ private readonly string host;
+ private readonly int port;
+ private readonly bool reusePort;
+ private readonly int delay;
+ private readonly int queueSize;
+
+ private UdpClient udpClient;
+ private IDictionary<IPEndPoint, BlockingCollection<UdpPacket>> readQueues;
+
+ public EndPoint LocalEndPoint { get { return udpClient.Client.LocalEndPoint; } }
+
+ /// <summary>
+ /// Constructs the UdpListener.
+ /// </summary>
+ /// <param name="host">address to listen to. Null means any local address.</param>
+ /// <param name="port">port to listen on. Port must be >= 0. Port of 0
+ /// means allocate an available port.</param>
+ /// <param name="reusePort">pass reuse port flag to the socket.</param>
+ /// <param name="delay">how long in milliseconds to wait before retrying a
+ /// failure. Delay must be >= 0. Delay of 0 means do not retry.</param>
+ /// <param name="queueSize">udp connection packet queue size.</param>
+ public UdpListener( string host, int port, bool reusePort, int delay, int queueSize )
+ {
+ if ( port < 0 || port > 65535 )
+ throw new ArgumentException( "port < 0 || port > 65535" );
+
+ if ( delay < 0 )
+ throw new ArgumentException( "delay < 0" );
+
+ if ( queueSize <= 0 )
+ throw new ArgumentException( "queueSize < 0" );
+
+ this.host = host;
+ this.port = port;
+ this.reusePort = reusePort;
+ this.delay = delay;
+ this.queueSize = queueSize;
+
+ readQueues = new Dictionary<IPEndPoint, BlockingCollection<UdpPacket>>();
+ }
+
+ public UdpListener( URL uri, Resources resources )
+ : this( TranslateHost( uri.Host ),
+ uri.Port != null ? uri.Port.Value : 0,
+ (bool)uri.GetBooleanTerm( REUSE_PORT, false ),
+ 0,
+ (int)uri.GetIntegerTerm( QUEUE_SIZE, 15 ) )
+ {
+ // nothing else.
+ }
+
+ /// <summary>
+ /// Constructs a UdpListener, initialized from the URI. Listens on the host
+ /// and port specified in the URI. To listen on all interfaces, specify
+ /// host as ALL_INTFS ("0.0.0.0"). If port is specified or defaulted to 0,
+ /// an unused port will be selected.
+ /// </summary>
+ /// <param name="uri"></param>
+ public UdpListener( string uri, Resources resources )
+ : this( new URL( uri ), resources )
+ {
+ // nothing else.
+ }
+
+ [MethodImpl( MethodImplOptions.Synchronized )]
+ public int Send( byte[] dgram, int bytes, IPEndPoint endPoint )
+ {
+ if ( udpClient != null )
+ return udpClient.Send( dgram, bytes, endPoint );
+
+ return -1;
+ }
+
+ public BlockingCollection<UdpPacket> allocReadQueue( IPEndPoint endPoint )
+ {
+ lock ( readQueues )
+ {
+ if ( !readQueues.ContainsKey( endPoint ) )
+ readQueues.Add( endPoint, new BlockingCollection<UdpPacket>( queueSize ) );
+
+ return readQueues[ endPoint ];
+ }
+ }
+
+ private BlockingCollection<UdpPacket> getReadQueue( IPEndPoint endPoint )
+ {
+ lock ( readQueues )
+ {
+ if ( readQueues.ContainsKey( endPoint ) )
+ return readQueues[ endPoint ];
+ }
+
+ return null;
+ }
+
+ public void releaseReadQueue( IPEndPoint endPoint )
+ {
+ BlockingCollection<UdpPacket> readQueue = null;
+
+ lock ( readQueues )
+ {
+ if ( readQueues.ContainsKey( endPoint ) )
+ {
+ readQueue = readQueues[ endPoint ];
+ readQueues.Remove( endPoint );
+ }
+ }
+
+ if ( readQueue != null )
+ {
+ readQueue.CompleteAdding();
+ readQueue.Dispose();
+ }
+ }
+
+ #region Connection<SessionListener<IPEndPoint>> Member
+ [MethodImpl( MethodImplOptions.Synchronized )]
+ protected override bool OpenSocket( bool reconnect )
+ {
+ bool first = true;
+
+ while ( IsStarted() )
+ {
+ if ( reconnect || !first )
+ {
+ if ( delay == 0 )
+ return false;
+
+ System.Threading.Monitor.Wait( this, delay );
+
+ if ( !IsStarted() )
+ break;
+ }
+
+ try
+ {
+ IPAddress addr;
+ if ( host != null )
+ {
+ IPAddress[] addrs = Dns.GetHostAddresses( host );
+ if ( addrs == null || addrs.Length == 0 )
+ throw new ArgumentException( "host is invalid" );
+ addr = addrs[ 0 ];
+ }
+ else
+ {
+ addr = IPAddress.Any;
+ }
+
+ IPEndPoint ipe = new IPEndPoint( addr, port );
+ udpClient = new UdpClient();
+ udpClient.Client.SetSocketOption( SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, reusePort );
+ try
+ {
+ udpClient.Client.Bind( ipe );
+ }
+ catch ( SocketException e )
+ {
+ throw new IOException( "Could not bind address " + host + ":" + port, e );
+ }
+
+ return true;
+ }
+ catch ( Exception e )
+ {
+ if ( first )
+ {
+ first = false;
+ FireException( "open", e );
+ }
+ }
+ }
+ return false;
+ }
+
+ protected override void SetUpSocket()
+ {
+ udpClient.DontFragment = true;
+ }
+
+ protected override void ReadSocket()
+ {
+ while ( IsStarted() )
+ {
+ IPEndPoint remoteEP = null;
+ byte[] receiveBytes;
+
+ try
+ {
+ receiveBytes = udpClient.Receive(ref remoteEP);
+
+ BlockingCollection<UdpPacket> readQueue = getReadQueue( remoteEP );
+
+ if ( readQueue == null )
+ session.SessionAccepted( remoteEP );
+
+ readQueue = getReadQueue( remoteEP );
+ if ( readQueue != null )
+ readQueue.TryAdd( new UdpPacket( remoteEP, receiveBytes ) );
+ }
+ catch ( SocketException ex )
+ {
+ if ( ex.SocketErrorCode != SocketError.ConnectionReset )
+ throw;
+ }
+ catch ( Exception e )
+ {
+ FireException( "accepted", e );
+ }
+ }
+ }
+
+ protected override void Stop0()
+ {
+ try
+ {
+ Close( false );
+ }
+ catch
+ {
+ // ignore
+ }
+ base.Stop0();
+ }
+
+ public override void Close( bool reset )
+ {
+ lock ( readQueues )
+ {
+ foreach ( IPEndPoint ipEndPoint in readQueues.Keys.ToArray() )
+ releaseReadQueue( ipEndPoint );
+ }
+
+ if ( udpClient != null )
+ {
+ udpClient.Close();
+ udpClient = null;
+ }
+ }
+
+ public override EndPoint LocalAddress()
+ {
+ if ( udpClient != null )
+ return udpClient.Client.LocalEndPoint;
+
+ return null;
+ }
+
+ public override EndPoint RemoteAddress()
+ {
+ // ignore
+ return null;
+ }
+ #endregion
+ }
+}
Added: incubator/etch/trunk/binding-csharp/runtime/src/main/csharp/Org.Apache.Etch.Bindings.Csharp/Util/UdpPacket.cs
URL: http://svn.apache.org/viewvc/incubator/etch/trunk/binding-csharp/runtime/src/main/csharp/Org.Apache.Etch.Bindings.Csharp/Util/UdpPacket.cs?rev=1103543&view=auto
==============================================================================
--- incubator/etch/trunk/binding-csharp/runtime/src/main/csharp/Org.Apache.Etch.Bindings.Csharp/Util/UdpPacket.cs (added)
+++ incubator/etch/trunk/binding-csharp/runtime/src/main/csharp/Org.Apache.Etch.Bindings.Csharp/Util/UdpPacket.cs Sun May 15 21:07:41 2011
@@ -0,0 +1,35 @@
+// $Id$
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+using System.Net;
+
+namespace Org.Apache.Etch.Bindings.Csharp.Util
+{
+ public class UdpPacket
+ {
+ public IPEndPoint IPEndPoint { get; protected set; }
+ public byte[] Bytes { get; protected set; }
+
+ public UdpPacket( IPEndPoint ipEndPoint, byte[] bytes )
+ {
+ this.IPEndPoint = ipEndPoint;
+ this.Bytes = bytes;
+ }
+ }
+}
Modified: incubator/etch/trunk/examples/helloworld/src/main/csharp/org.apache.etch.examples.helloworld/MainHelloWorldClient.cs
URL: http://svn.apache.org/viewvc/incubator/etch/trunk/examples/helloworld/src/main/csharp/org.apache.etch.examples.helloworld/MainHelloWorldClient.cs?rev=1103543&r1=1103542&r2=1103543&view=diff
==============================================================================
--- incubator/etch/trunk/examples/helloworld/src/main/csharp/org.apache.etch.examples.helloworld/MainHelloWorldClient.cs (original)
+++ incubator/etch/trunk/examples/helloworld/src/main/csharp/org.apache.etch.examples.helloworld/MainHelloWorldClient.cs Sun May 15 21:07:41 2011
@@ -30,6 +30,7 @@ namespace org.apache.etch.examples.hello
public static void Main(String[] args)
{
// TODO: Change to correct URI
+ //string uri = "udp://127.0.0.1:4001";
string uri = "tcp://127.0.0.1:4001";
RemoteHelloWorldServer server = HelloWorldHelper.NewServer( uri, null, new MainHelloWorldClient());
Modified: incubator/etch/trunk/examples/helloworld/src/main/csharp/org.apache.etch.examples.helloworld/MainHelloWorldListener.cs
URL: http://svn.apache.org/viewvc/incubator/etch/trunk/examples/helloworld/src/main/csharp/org.apache.etch.examples.helloworld/MainHelloWorldListener.cs?rev=1103543&r1=1103542&r2=1103543&view=diff
==============================================================================
--- incubator/etch/trunk/examples/helloworld/src/main/csharp/org.apache.etch.examples.helloworld/MainHelloWorldListener.cs (original)
+++ incubator/etch/trunk/examples/helloworld/src/main/csharp/org.apache.etch.examples.helloworld/MainHelloWorldListener.cs Sun May 15 21:07:41 2011
@@ -32,6 +32,7 @@ namespace org.apache.etch.examples.hello
public static void Main(String[] args)
{
// TODO: Change to correct URI
+ //string uri = "udp://127.0.0.1:4001";
string uri = "tcp://127.0.0.1:4001";
ServerFactory listener = HelloWorldHelper.NewListener( uri, null, new MainHelloWorldListener());