You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/07/07 23:00:57 UTC
[1/2] incubator-reef git commit: [REEF-422] Convert Wake Layer from
Writable to Streaming
Repository: incubator-reef
Updated Branches:
refs/heads/master ec9b497d4 -> ba2653d6b
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManager.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManager.cs
new file mode 100644
index 0000000..6eb0190
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManager.cs
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Net;
+using Org.Apache.REEF.Wake.StreamingCodec;
+
+namespace Org.Apache.REEF.Wake.Remote.Impl
+{
+ /// <summary>
+ /// Manages incoming and outgoing messages between remote hosts.
+ /// </summary>
+ /// <typeparam name="T">Message type T.</typeparam>
+ internal sealed class StreamingRemoteManager<T> : IRemoteManager<T>
+ {
+ private readonly ObserverContainer<T> _observerContainer;
+ private readonly StreamingTransportServer<IRemoteEvent<T>> _server;
+ private readonly Dictionary<IPEndPoint, ProxyObserver> _cachedClients;
+ private readonly IStreamingCodec<IRemoteEvent<T>> _remoteEventCodec;
+
+ /// <summary>
+ /// Constructs a DefaultRemoteManager listening on the specified address and
+ /// a specific port.
+ /// </summary>
+ /// <param name="localAddress">The address to listen on</param>
+ /// <param name="tcpPortProvider">Tcp port provider</param>
+ /// <param name="streamingCodec">Streaming codec</param>
+ [Obsolete("Use IRemoteManagerFactory.GetInstance() instead.", false)]
+ public StreamingRemoteManager(IPAddress localAddress, ITcpPortProvider tcpPortProvider, IStreamingCodec<T> streamingCodec)
+ {
+ if (localAddress == null)
+ {
+ throw new ArgumentNullException("localAddress");
+ }
+
+ _observerContainer = new ObserverContainer<T>();
+ _cachedClients = new Dictionary<IPEndPoint, ProxyObserver>();
+ _remoteEventCodec = new RemoteEventStreamingCodec<T>(streamingCodec);
+
+ // Begin to listen for incoming messages
+ _server = new StreamingTransportServer<IRemoteEvent<T>>(localAddress, _observerContainer, tcpPortProvider, _remoteEventCodec);
+ _server.Run();
+
+ LocalEndpoint = _server.LocalEndpoint;
+ Identifier = new SocketRemoteIdentifier(LocalEndpoint);
+ }
+
+ /// <summary>
+ /// Gets the RemoteIdentifier for the DefaultRemoteManager
+ /// </summary>
+ public IRemoteIdentifier Identifier { get; private set; }
+
+ /// <summary>
+ /// Gets the local IPEndPoint for the DefaultRemoteManager
+ /// </summary>
+ public IPEndPoint LocalEndpoint { get; private set; }
+
+ /// <summary>
+ /// Returns an IObserver used to send messages to the remote host at
+ /// the specified IPEndpoint.
+ /// </summary>
+ /// <param name="remoteEndpoint">The IPEndpoint of the remote host</param>
+ /// <returns>An IObserver used to send messages to the remote host</returns>
+ public IObserver<T> GetRemoteObserver(RemoteEventEndPoint<T> remoteEndpoint)
+ {
+ if (remoteEndpoint == null)
+ {
+ throw new ArgumentNullException("remoteEndpoint");
+ }
+
+ SocketRemoteIdentifier id = remoteEndpoint.Id as SocketRemoteIdentifier;
+ if (id == null)
+ {
+ throw new ArgumentException("ID not supported");
+ }
+
+ return GetRemoteObserver(id.Addr);
+ }
+
+ /// <summary>
+ /// Returns an IObserver used to send messages to the remote host at
+ /// the specified IPEndpoint.
+ /// </summary>
+ /// <param name="remoteEndpoint">The IPEndpoint of the remote host</param>
+ /// <returns>An IObserver used to send messages to the remote host</returns>
+ public IObserver<T> GetRemoteObserver(IPEndPoint remoteEndpoint)
+ {
+ if (remoteEndpoint == null)
+ {
+ throw new ArgumentNullException("remoteEndpoint");
+ }
+
+ ProxyObserver remoteObserver;
+ if (!_cachedClients.TryGetValue(remoteEndpoint, out remoteObserver))
+ {
+ StreamingTransportClient<IRemoteEvent<T>> client =
+ new StreamingTransportClient<IRemoteEvent<T>>(remoteEndpoint, _observerContainer, _remoteEventCodec);
+
+ remoteObserver = new ProxyObserver(client);
+ _cachedClients[remoteEndpoint] = remoteObserver;
+ }
+
+ return remoteObserver;
+ }
+
+ /// <summary>
+ /// Registers an IObserver used to handle incoming messages from the remote host
+ /// at the specified IPEndPoint.
+ /// The IDisposable that is returned can be used to unregister the IObserver.
+ /// </summary>
+ /// <param name="remoteEndpoint">The IPEndPoint of the remote host</param>
+ /// <param name="observer">The IObserver to handle incoming messages</param>
+ /// <returns>An IDisposable used to unregister the observer with</returns>
+ public IDisposable RegisterObserver(RemoteEventEndPoint<T> remoteEndpoint, IObserver<T> observer)
+ {
+ if (remoteEndpoint == null)
+ {
+ throw new ArgumentNullException("remoteEndpoint");
+ }
+
+ if (observer == null)
+ {
+ throw new ArgumentNullException("observer");
+ }
+
+ SocketRemoteIdentifier id = remoteEndpoint.Id as SocketRemoteIdentifier;
+ if (id == null)
+ {
+ throw new ArgumentException("ID not supported");
+ }
+
+ return RegisterObserver(id.Addr, observer);
+ }
+
+ /// <summary>
+ /// Registers an IObserver used to handle incoming messages from the remote host
+ /// at the specified IPEndPoint.
+ /// The IDisposable that is returned can be used to unregister the IObserver.
+ /// </summary>
+ /// <param name="remoteEndpoint">The IPEndPoint of the remote host</param>
+ /// <param name="observer">The IObserver to handle incoming messages</param>
+ /// <returns>An IDisposable used to unregister the observer with</returns>
+ public IDisposable RegisterObserver(IPEndPoint remoteEndpoint, IObserver<T> observer)
+ {
+ if (remoteEndpoint == null)
+ {
+ throw new ArgumentNullException("remoteEndpoint");
+ }
+ if (observer == null)
+ {
+ throw new ArgumentNullException("observer");
+ }
+
+ return _observerContainer.RegisterObserver(remoteEndpoint, observer);
+ }
+
+ /// <summary>
+ /// Registers an IObserver used to handle incoming messages from the remote host
+ /// at the specified IPEndPoint.
+ /// The IDisposable that is returned can be used to unregister the IObserver.
+ /// </summary>
+ /// <param name="observer">The IObserver to handle incoming messages</param>
+ /// <returns>An IDisposable used to unregister the observer with</returns>
+ public IDisposable RegisterObserver(IObserver<IRemoteMessage<T>> observer)
+ {
+ if (observer == null)
+ {
+ throw new ArgumentNullException("observer");
+ }
+
+ return _observerContainer.RegisterObserver(observer);
+ }
+
+ /// <summary>
+ /// Release all resources for the DefaultRemoteManager.
+ /// </summary>
+ public void Dispose()
+ {
+ foreach (ProxyObserver cachedClient in _cachedClients.Values)
+ {
+ cachedClient.Dispose();
+ }
+
+ if (_server != null)
+ {
+ _server.Dispose();
+ }
+ }
+
+ /// <summary>
+ /// Observer to send messages to connected remote host
+ /// </summary>
+ private class ProxyObserver : IObserver<T>, IDisposable
+ {
+ private readonly StreamingTransportClient<IRemoteEvent<T>> _client;
+
+ /// <summary>
+ /// Create new ProxyObserver
+ /// </summary>
+ /// <param name="client">The connected WritableTransport client used to send
+ /// messages to remote host</param>
+ public ProxyObserver(StreamingTransportClient<IRemoteEvent<T>> client)
+ {
+ _client = client;
+ }
+
+ /// <summary>
+ /// Send the message to the remote host
+ /// </summary>
+ /// <param name="message">The message to send</param>
+ public void OnNext(T message)
+ {
+ IRemoteEvent<T> remoteEvent = new RemoteEvent<T>(_client.Link.LocalEndpoint,
+ _client.Link.RemoteEndpoint,
+ message);
+
+ _client.Send(remoteEvent);
+ }
+
+ /// <summary>
+ /// Close underlying WritableTransport client
+ /// </summary>
+ public void Dispose()
+ {
+ _client.Dispose();
+ }
+
+ public void OnError(Exception error)
+ {
+ throw error;
+ }
+
+ public void OnCompleted()
+ {
+ throw new NotImplementedException();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManagerFactory.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManagerFactory.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManagerFactory.cs
new file mode 100644
index 0000000..90e3aca
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManagerFactory.cs
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Net;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Interface;
+
+namespace Org.Apache.REEF.Wake.Remote.Impl
+{
+ /// <summary>
+ /// StreamingRemoteManagerFactory for StreamingRemoteManager.
+ /// </summary>
+ public sealed class StreamingRemoteManagerFactory
+ {
+ private readonly ITcpPortProvider _tcpPortProvider;
+ private readonly IInjector _injector;
+
+ [Inject]
+ private StreamingRemoteManagerFactory(ITcpPortProvider tcpPortProvider, IInjector injector)
+ {
+ _tcpPortProvider = tcpPortProvider;
+ _injector = injector;
+ }
+
+ //ToDo: The port argument will be removed once changes are made in WritableNetworkService [REEF-447]
+ public IRemoteManager<T> GetInstance<T>(IPAddress localAddress, int port) where T : IWritable
+ {
+#pragma warning disable 618
+// This is the one place allowed to call this constructor. Hence, disabling the warning is OK.
+ var codec = _injector.GetInstance<TemporaryWritableToStreamingCodec<T>>();
+ return new StreamingRemoteManager<T>(localAddress, _tcpPortProvider, codec);
+#pragma warning disable 618
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingTransportClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingTransportClient.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingTransportClient.cs
new file mode 100644
index 0000000..111be5d
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingTransportClient.cs
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Net;
+using System.Threading;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Utilities.Diagnostics;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Wake.StreamingCodec;
+
+namespace Org.Apache.REEF.Wake.Remote.Impl
+{
+ /// <summary>
+ /// Establish connections to TransportServer for remote message passing
+ /// </summary>
+ /// <typeparam name="T">Generic Type of message.</typeparam>
+ internal sealed class StreamingTransportClient<T> : IDisposable
+ {
+ private readonly ILink<T> _link;
+ private readonly IObserver<TransportEvent<T>> _observer;
+ private readonly CancellationTokenSource _cancellationSource;
+ private bool _disposed;
+ private static readonly Logger Logger = Logger.GetLogger(typeof(StreamingTransportClient<T>));
+
+ /// <summary>
+ /// Construct a TransportClient.
+ /// Used to send messages to the specified remote endpoint.
+ /// </summary>
+ /// <param name="remoteEndpoint">The endpoint of the remote server to connect to</param>
+ /// <param name="streamingCodec">Streaming codec</param>
+ internal StreamingTransportClient(IPEndPoint remoteEndpoint, IStreamingCodec<T> streamingCodec)
+ {
+ Exceptions.ThrowIfArgumentNull(remoteEndpoint, "remoteEndpoint", Logger);
+
+ _link = new StreamingLink<T>(remoteEndpoint, streamingCodec);
+ _cancellationSource = new CancellationTokenSource();
+ _disposed = false;
+ }
+
+ /// <summary>
+ /// Construct a TransportClient.
+ /// Used to send messages to the specified remote endpoint.
+ /// </summary>
+ /// <param name="remoteEndpoint">The endpoint of the remote server to connect to</param>
+ /// <param name="observer">Callback used when receiving responses from remote host</param>
+ /// <param name="streamingCodec">Streaming codec</param>
+ internal StreamingTransportClient(IPEndPoint remoteEndpoint,
+ IObserver<TransportEvent<T>> observer,
+ IStreamingCodec<T> streamingCodec)
+ : this(remoteEndpoint, streamingCodec)
+ {
+ _observer = observer;
+ Task.Run(() => ResponseLoop());
+ }
+
+ /// <summary>
+ /// Gets the underlying transport link.
+ /// </summary>
+ public ILink<T> Link
+ {
+ get { return _link; }
+ }
+
+ /// <summary>
+ /// Send the remote message.
+ /// </summary>
+ /// <param name="message">The message to send</param>
+ public void Send(T message)
+ {
+ if (message == null)
+ {
+ throw new ArgumentNullException("message");
+ }
+
+ _link.Write(message);
+ }
+
+ /// <summary>
+ /// Close all opened connections
+ /// </summary>
+ public void Dispose()
+ {
+ if (!_disposed)
+ {
+ _link.Dispose();
+ _disposed = true;
+ }
+ }
+
+ /// <summary>
+ /// Continually read responses from remote host
+ /// </summary>
+ private async Task ResponseLoop()
+ {
+ while (!_cancellationSource.IsCancellationRequested)
+ {
+ T message = await _link.ReadAsync(_cancellationSource.Token);
+ if (message == null)
+ {
+ break;
+ }
+
+ TransportEvent<T> transportEvent = new TransportEvent<T>(message, _link);
+ _observer.OnNext(transportEvent);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingTransportServer.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingTransportServer.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingTransportServer.cs
new file mode 100644
index 0000000..3f05c17
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingTransportServer.cs
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Net;
+using System.Net.Sockets;
+using System.Threading;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Utilities.Diagnostics;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Wake.StreamingCodec;
+using Org.Apache.REEF.Wake.Util;
+
+namespace Org.Apache.REEF.Wake.Remote.Impl
+{
+ /// <summary>
+ /// Server to handle incoming remote messages.
+ /// </summary>
+ /// <typeparam name="T">Generic Type of message. It is constrained to have implemented IWritable and IType interface</typeparam>
+ internal sealed class StreamingTransportServer<T> : IDisposable
+ {
+ private static readonly Logger LOGGER = Logger.GetLogger(typeof (TransportServer<>));
+
+ private TcpListener _listener;
+ private readonly CancellationTokenSource _cancellationSource;
+ private readonly IObserver<TransportEvent<T>> _remoteObserver;
+ private readonly ITcpPortProvider _tcpPortProvider;
+ private readonly IStreamingCodec<T> _streamingCodec;
+ private bool _disposed;
+ private Task _serverTask;
+
+ /// <summary>
+ /// Constructs a TransportServer to listen for remote events.
+ /// Listens on the specified remote endpoint. When it recieves a remote
+ /// event, it will envoke the specified remote handler.
+ /// </summary>
+ /// <param name="address">Endpoint addres to listen on</param>
+ /// <param name="remoteHandler">The handler to invoke when receiving incoming
+ /// remote messages</param>
+ /// <param name="tcpPortProvider">Find port numbers if listenport is 0</param>
+ /// <param name="streamingCodec">Streaming codec</param>
+ internal StreamingTransportServer(
+ IPAddress address,
+ IObserver<TransportEvent<T>> remoteHandler,
+ ITcpPortProvider tcpPortProvider,
+ IStreamingCodec<T> streamingCodec)
+ {
+ _listener = new TcpListener(address, 0);
+ _remoteObserver = remoteHandler;
+ _tcpPortProvider = tcpPortProvider;
+ _cancellationSource = new CancellationTokenSource();
+ _cancellationSource.Token.ThrowIfCancellationRequested();
+ _streamingCodec = streamingCodec;
+ _disposed = false;
+ }
+
+ /// <summary>
+ /// Returns the listening endpoint for the TransportServer
+ /// </summary>
+ public IPEndPoint LocalEndpoint
+ {
+ get { return _listener.LocalEndpoint as IPEndPoint; }
+ }
+
+ /// <summary>
+ /// Starts listening for incoming remote messages.
+ /// </summary>
+ public void Run()
+ {
+ FindAPortAndStartListener();
+ _serverTask = Task.Run(() => StartServer());
+ }
+
+ private void FindAPortAndStartListener()
+ {
+ var foundAPort = false;
+ var exception = new SocketException((int)SocketError.AddressAlreadyInUse);
+ for (var enumerator = _tcpPortProvider.GetEnumerator();
+ !foundAPort && enumerator.MoveNext();
+ )
+ {
+ _listener = new TcpListener(LocalEndpoint.Address, enumerator.Current);
+ try
+ {
+ _listener.Start();
+ foundAPort = true;
+ }
+ catch (SocketException e)
+ {
+ exception = e;
+ }
+ }
+ if (!foundAPort)
+ {
+ Exceptions.Throw(exception, "Could not find a port to listen on", LOGGER);
+ }
+ LOGGER.Log(Level.Info,
+ String.Format("Listening on {0}", _listener.LocalEndpoint.ToString()));
+ }
+
+
+ /// <summary>
+ /// Close the TransportServer and all open connections
+ /// </summary>
+ public void Dispose()
+ {
+ if (!_disposed)
+ {
+ _cancellationSource.Cancel();
+
+ try
+ {
+ _listener.Stop();
+ }
+ catch (SocketException)
+ {
+ LOGGER.Log(Level.Info, "Disposing of transport server before listener is created.");
+ }
+
+ if (_serverTask != null)
+ {
+ _serverTask.Wait();
+
+ // Give the TransportServer Task 500ms to shut down, ignore any timeout errors
+ try
+ {
+ CancellationTokenSource serverDisposeTimeout = new CancellationTokenSource(500);
+ _serverTask.Wait(serverDisposeTimeout.Token);
+ }
+ catch (Exception e)
+ {
+ Console.Error.WriteLine(e);
+ }
+ finally
+ {
+ _serverTask.Dispose();
+ }
+ }
+ }
+
+ _disposed = true;
+ }
+
+ /// <summary>
+ /// Helper method to start TransportServer. This will
+ /// be run in an asynchronous Task.
+ /// </summary>
+ /// <returns>An asynchronous Task for the running server.</returns>
+ private async Task StartServer()
+ {
+ try
+ {
+ while (!_cancellationSource.Token.IsCancellationRequested)
+ {
+ TcpClient client = await _listener.AcceptTcpClientAsync().ConfigureAwait(false);
+ ProcessClient(client).Forget();
+ }
+ }
+ catch (InvalidOperationException)
+ {
+ LOGGER.Log(Level.Info, "TransportServer has been closed.");
+ }
+ catch (OperationCanceledException)
+ {
+ LOGGER.Log(Level.Info, "TransportServer has been closed.");
+ }
+ }
+
+ /// <summary>
+ /// Recieves event from connected TcpClient and invokes handler on the event.
+ /// </summary>
+ /// <param name="client">The connected client</param>
+ private async Task ProcessClient(TcpClient client)
+ {
+ // Keep reading messages from client until they disconnect or timeout
+ CancellationToken token = _cancellationSource.Token;
+ using (ILink<T> link = new StreamingLink<T>(client, _streamingCodec))
+ {
+ while (!token.IsCancellationRequested)
+ {
+ T message = await link.ReadAsync(token);
+
+ if (message == null)
+ {
+ break;
+ }
+
+ TransportEvent<T> transportEvent = new TransportEvent<T>(message, link);
+ _remoteObserver.OnNext(transportEvent);
+ }
+ LOGGER.Log(Level.Error,
+ "ProcessClient close the Link. IsCancellationRequested: " + token.IsCancellationRequested);
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TemporaryWritableToStreamingCodec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TemporaryWritableToStreamingCodec.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TemporaryWritableToStreamingCodec.cs
new file mode 100644
index 0000000..1af9b86
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TemporaryWritableToStreamingCodec.cs
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Utilities.Diagnostics;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Wake.StreamingCodec;
+
+namespace Org.Apache.REEF.Wake.Remote.Impl
+{
+ // TODO: This class will be removed shortly and is used to reduce the size of pull request.
+ internal sealed class TemporaryWritableToStreamingCodec<T> : IStreamingCodec<T> where T:IWritable
+ {
+ private readonly IInjector _injector;
+
+ [Inject]
+ public TemporaryWritableToStreamingCodec(IInjector injector)
+ {
+ _injector = injector;
+ }
+
+ public T Read(IDataReader reader)
+ {
+ string type = reader.ReadString();
+ var value = (T) _injector.ForkInjector().GetInstance(type);
+ value.Read(reader);
+ return value;
+ }
+
+ public void Write(T obj, IDataWriter writer)
+ {
+ writer.WriteString(obj.GetType().AssemblyQualifiedName);
+ obj.Write(writer);
+ }
+
+ public async Task<T> ReadAsync(IDataReader reader, CancellationToken token)
+ {
+ string type = await reader.ReadStringAsync(token);
+ var value = (T) _injector.ForkInjector().GetInstance(type);
+ await value.ReadAsync(reader, token);
+ return value;
+ }
+
+ public async Task WriteAsync(T obj, IDataWriter writer, CancellationToken token)
+ {
+ await writer.WriteStringAsync(obj.GetType().AssemblyQualifiedName, token);
+ await obj.WriteAsync(writer, token);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableLink.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableLink.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableLink.cs
deleted file mode 100644
index 9859338..0000000
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableLink.cs
+++ /dev/null
@@ -1,295 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Net;
-using System.Net.Sockets;
-using System.Threading;
-using System.Threading.Tasks;
-using Org.Apache.REEF.Tang.Exceptions;
-using Org.Apache.REEF.Tang.Interface;
-using Org.Apache.REEF.Utilities.Diagnostics;
-using Org.Apache.REEF.Utilities.Logging;
-using Org.Apache.REEF.Wake.Util;
-
-namespace Org.Apache.REEF.Wake.Remote.Impl
-{
- /// <summary>
- /// Represents an open connection between remote hosts. This class is not thread safe
- /// </summary>
- /// <typeparam name="T">Generic Type of message. It is constrained to have implemented IWritable and IType interface</typeparam>
- [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)]
- public class WritableLink<T> : ILink<T> where T : IWritable
- {
- private static readonly Logger Logger = Logger.GetLogger(typeof (WritableLink<T>));
-
- private readonly IPEndPoint _localEndpoint;
- private bool _disposed;
- private readonly NetworkStream _stream;
- private readonly IInjector _injector;
-
-
- /// <summary>
- /// Stream reader to be passed to IWritable
- /// </summary>
- private readonly StreamDataReader _reader;
-
- /// <summary>
- /// Stream writer from which to read from IWritable
- /// </summary>
- private readonly StreamDataWriter _writer;
-
- /// <summary>
- /// Constructs a Link object.
- /// Connects to the specified remote endpoint.
- /// </summary>
- /// <param name="remoteEndpoint">The remote endpoint to connect to</param>
- /// <param name="injector">The injector to pass arguments to incoming messages</param>
- public WritableLink(IPEndPoint remoteEndpoint, IInjector injector)
- {
- if (remoteEndpoint == null)
- {
- throw new ArgumentNullException("remoteEndpoint");
- }
-
- Client = new TcpClient();
- Client.Connect(remoteEndpoint);
-
- _stream = Client.GetStream();
- _localEndpoint = GetLocalEndpoint();
- _disposed = false;
- _reader = new StreamDataReader(_stream);
- _writer = new StreamDataWriter(_stream);
- _injector = injector;
- }
-
- /// <summary>
- /// Constructs a Link object.
- /// Uses the already connected TcpClient.
- /// </summary>
- /// <param name="client">The already connected client</param>
- /// <param name="injector">The injector to pass arguments to incoming messages</param>
- public WritableLink(TcpClient client, IInjector injector)
- {
- if (client == null)
- {
- throw new ArgumentNullException("client");
- }
-
- Client = client;
- _stream = Client.GetStream();
- _localEndpoint = GetLocalEndpoint();
- _disposed = false;
- _reader = new StreamDataReader(_stream);
- _writer = new StreamDataWriter(_stream);
- _injector = injector;
- }
-
- /// <summary>
- /// Returns the local socket address
- /// </summary>
- public IPEndPoint LocalEndpoint
- {
- get { return _localEndpoint; }
- }
-
- /// <summary>
- /// Returns the remote socket address
- /// </summary>
- public IPEndPoint RemoteEndpoint
- {
- get { return (IPEndPoint) Client.Client.RemoteEndPoint; }
- }
-
- /// <summary>
- /// Gets the underlying TcpClient
- /// </summary>
- public TcpClient Client { get; private set; }
-
- /// <summary>
- /// Writes the message to the remote host
- /// </summary>
- /// <param name="value">The data to write</param>
- public void Write(T value)
- {
- if (value == null)
- {
- throw new ArgumentNullException("value");
- }
- if (_disposed)
- {
- Exceptions.Throw(new IllegalStateException("Link has been closed."), Logger);
- }
-
- _writer.WriteString(value.GetType().AssemblyQualifiedName);
- value.Write(_writer);
- }
-
- /// <summary>
- /// Writes the value to this link asynchronously
- /// </summary>
- /// <param name="value">The data to write</param>
- /// <param name="token">The cancellation token</param>
- public async Task WriteAsync(T value, CancellationToken token)
- {
- if (_disposed)
- {
- Exceptions.Throw(new IllegalStateException("Link has been closed."), Logger);
- }
-
- await _writer.WriteStringAsync(value.GetType().AssemblyQualifiedName, token);
- await value.WriteAsync(_writer, token);
- }
-
- /// <summary>
- /// Reads the value from the link synchronously
- /// </summary>
- public T Read()
- {
- if (_disposed)
- {
- Exceptions.Throw(new IllegalStateException("Link has been disposed."), Logger);
- }
-
- string dataType = _reader.ReadString();
-
- if (dataType == null)
- {
- return default(T);
- }
-
- try
- {
- T value = (T) _injector.ForkInjector().GetInstance(dataType);
- value.Read(_reader);
- return value;
- }
- catch (InjectionException)
- {
- return default(T);
- }
- }
-
- /// <summary>
- /// Reads the value from the link asynchronously
- /// </summary>
- /// <param name="token">The cancellation token</param>
- public async Task<T> ReadAsync(CancellationToken token)
- {
- if (_disposed)
- {
- Exceptions.Throw(new IllegalStateException("Link has been disposed."), Logger);
- }
-
- string dataType = "";
-
- dataType = await _reader.ReadStringAsync(token);
-
- if (dataType == null)
- {
- return default(T);
- }
-
- try
- {
- T value = (T) _injector.ForkInjector().GetInstance(dataType);
- await value.ReadAsync(_reader, token);
- return value;
- }
- catch (InjectionException)
- {
- return default(T);
- }
- }
-
- /// <summary>
- /// Close the client connection
- /// </summary>
- public void Dispose()
- {
- Dispose(true);
- GC.SuppressFinalize(this);
- }
-
- /// <summary>
- /// Subclasses of Links should overwrite this to handle disposing
- /// of the link
- /// </summary>
- /// <param name="disposing">To dispose or not</param>
- public virtual void Dispose(bool disposing)
- {
- if (_disposed)
- {
- return;
- }
-
- if (disposing)
- {
- try
- {
- Client.GetStream().Close();
- }
- catch (InvalidOperationException)
- {
- Logger.Log(Level.Warning, "failed to close stream on a non-connected socket.");
- }
-
- Client.Close();
- }
- _disposed = true;
- }
-
- /// <summary>
- /// Overrides Equals. Two Link objects are equal if they are connected
- /// to the same remote endpoint.
- /// </summary>
- /// <param name="obj">The object to compare</param>
- /// <returns>True if the object is equal to this Link, otherwise false</returns>
- public override bool Equals(object obj)
- {
- Link<T> other = obj as Link<T>;
- if (other == null)
- {
- return false;
- }
-
- return other.RemoteEndpoint.Equals(RemoteEndpoint);
- }
-
- /// <summary>
- /// Gets the hash code for the Link object.
- /// </summary>
- /// <returns>The object's hash code</returns>
- public override int GetHashCode()
- {
- return RemoteEndpoint.GetHashCode();
- }
-
- /// <summary>
- /// Discovers the IPEndpoint for the current machine.
- /// </summary>
- /// <returns>The local IPEndpoint</returns>
- private IPEndPoint GetLocalEndpoint()
- {
- IPAddress address = NetworkUtils.LocalIPAddress;
- int port = ((IPEndPoint) Client.Client.LocalEndPoint).Port;
- return new IPEndPoint(address, port);
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableObserverContainer.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableObserverContainer.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableObserverContainer.cs
deleted file mode 100644
index 9790e3e..0000000
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableObserverContainer.cs
+++ /dev/null
@@ -1,132 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Collections.Concurrent;
-using System.Net;
-using Org.Apache.REEF.Wake.Util;
-using Org.Apache.REEF.Utilities.Diagnostics;
-using Org.Apache.REEF.Utilities.Logging;
-
-namespace Org.Apache.REEF.Wake.Remote.Impl
-{
- /// <summary>
- /// Stores registered IObservers for DefaultRemoteManager.
- /// Can register and look up IObservers by remote IPEndPoint.
- /// </summary>
- /// <typeparam name="T">Message type T. It is assumed to be IWritable</typeparam>
- [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)]
- internal class WritableObserverContainer<T> : IObserver<TransportEvent<IWritableRemoteEvent<T>>> where T : IWritable
- {
- private static readonly Logger Logger = Logger.GetLogger(typeof(WritableObserverContainer<>));
- private readonly ConcurrentDictionary<IPEndPoint, IObserver<T>> _endpointMap;
- private readonly ConcurrentDictionary<Type, IObserver<IRemoteMessage<T>>> _typeMap;
- private IObserver<T> _universalObserver;
-
- /// <summary>
- /// Constructs a new ObserverContainer used to manage remote IObservers.
- /// </summary>
- public WritableObserverContainer()
- {
- _endpointMap = new ConcurrentDictionary<IPEndPoint, IObserver<T>>(new IPEndPointComparer());
- _typeMap = new ConcurrentDictionary<Type, IObserver<IRemoteMessage<T>>>();
- }
-
- /// <summary>
- /// Registers an IObserver used to handle incoming messages from the remote host
- /// at the specified IPEndPoint.
- /// </summary>
- /// <param name="remoteEndpoint">The IPEndPoint of the remote host</param>
- /// <param name="observer">The IObserver to handle incoming messages</param>
- /// <returns>An IDisposable used to unregister the observer with</returns>
- public IDisposable RegisterObserver(IPEndPoint remoteEndpoint, IObserver<T> observer)
- {
- if (remoteEndpoint.Address.Equals(IPAddress.Any))
- {
- _universalObserver = observer;
- return Disposable.Create(() => { _universalObserver = null; });
- }
-
- _endpointMap[remoteEndpoint] = observer;
- return Disposable.Create(() => _endpointMap.TryRemove(remoteEndpoint, out observer));
- }
-
- /// <summary>
- /// Registers an IObserver to handle incoming messages from a remote host
- /// </summary>
- /// <param name="observer">The IObserver to handle incoming messages</param>
- /// <returns>An IDisposable used to unregister the observer with</returns>
- public IDisposable RegisterObserver(IObserver<IRemoteMessage<T>> observer)
- {
- _typeMap[typeof(T)] = observer;
- return Disposable.Create(() => _typeMap.TryRemove(typeof(T), out observer));
- }
-
- /// <summary>
- /// Look up the IObserver for the registered IPEndPoint or event type
- /// and execute the IObserver.
- /// </summary>
- /// <param name="transportEvent">The incoming remote event</param>
- public void OnNext(TransportEvent<IWritableRemoteEvent<T>> transportEvent)
- {
- IWritableRemoteEvent<T> remoteEvent = transportEvent.Data;
- remoteEvent.LocalEndPoint = transportEvent.Link.LocalEndpoint;
- remoteEvent.RemoteEndPoint = transportEvent.Link.RemoteEndpoint;
- T value = remoteEvent.Value;
- bool handled = false;
-
- IObserver<T> observer1;
- IObserver<IRemoteMessage<T>> observer2;
- if (_universalObserver != null)
- {
- _universalObserver.OnNext(value);
- handled = true;
- }
- if (_endpointMap.TryGetValue(remoteEvent.RemoteEndPoint, out observer1))
- {
- // IObserver was registered by IPEndpoint
- observer1.OnNext(value);
- handled = true;
- }
- else if (_typeMap.TryGetValue(value.GetType(), out observer2))
- {
- // IObserver was registered by event type
- IRemoteIdentifier id = new SocketRemoteIdentifier(remoteEvent.RemoteEndPoint);
- IRemoteMessage<T> remoteMessage = new DefaultRemoteMessage<T>(id, value);
- observer2.OnNext(remoteMessage);
- handled = true;
- }
-
- if (!handled)
- {
- throw new WakeRuntimeException("Unrecognized Wake RemoteEvent message");
- }
- }
-
- public void OnError(Exception error)
- {
- throw error;
- }
-
- public void OnCompleted()
- {
- Logger.Log(Level.Info, "Exiting the Writable Observer Container");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteEvent.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteEvent.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteEvent.cs
deleted file mode 100644
index b3664d0..0000000
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteEvent.cs
+++ /dev/null
@@ -1,115 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Net;
-using System.Threading;
-using System.Threading.Tasks;
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Tang.Interface;
-
-namespace Org.Apache.REEF.Wake.Remote.Impl
-{
- /// <summary>
- /// Writable remote event class
- /// </summary>
- /// <typeparam name="T">Type of remote event message. It is assumed that T implements IWritable</typeparam>
- [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)]
- internal sealed class WritableRemoteEvent<T> : IWritableRemoteEvent<T> where T : IWritable
- {
- private readonly IInjector _injector;
-
- /// <summary>
- /// Creates the Remote Event
- /// </summary>
- /// <param name="localEndpoint">Local Address</param>
- /// <param name="remoteEndpoint">Remote Address</param>
- /// <param name="value">Actual message</param>
- public WritableRemoteEvent(IPEndPoint localEndpoint, IPEndPoint remoteEndpoint, T value)
- {
- LocalEndPoint = localEndpoint;
- RemoteEndPoint = remoteEndpoint;
- Value = value;
- }
-
- /// <summary>
- /// Creates empty Remote Event
- /// </summary>
- [Inject]
- public WritableRemoteEvent(IInjector injector)
- {
- _injector = injector;
- }
-
- /// <summary>
- /// Local Address
- /// </summary>
- public IPEndPoint LocalEndPoint { get; set; }
-
- /// <summary>
- /// Remote Address
- /// </summary>
- public IPEndPoint RemoteEndPoint { get; set; }
-
- /// <summary>
- /// The actual message
- /// </summary>
- public T Value { get; set; }
-
- /// <summary>
- /// Read the class fields.
- /// </summary>
- /// <param name="reader">The reader from which to read </param>
- public void Read(IDataReader reader)
- {
- Value = (T)_injector.ForkInjector().GetInstance(typeof(T));
- Value.Read(reader);
- }
-
- /// <summary>
- /// Writes the class fields.
- /// </summary>
- /// <param name="writer">The writer to which to write</param>
- public void Write(IDataWriter writer)
- {
- Value.Write(writer);
- }
-
- /// <summary>
- /// Read the class fields.
- /// </summary>
- /// <param name="reader">The reader from which to read </param>
- /// <param name="token">The cancellation token</param>
- public async Task ReadAsync(IDataReader reader, CancellationToken token)
- {
- Value = (T)_injector.ForkInjector().GetInstance(typeof(T));
- await Value.ReadAsync(reader, token);
- }
-
- /// <summary>
- /// Writes the class fields.
- /// </summary>
- /// <param name="writer">The writer to which to write</param>
- /// <param name="token">The cancellation token</param>
- public async Task WriteAsync(IDataWriter writer, CancellationToken token)
- {
- await Value.WriteAsync(writer, token);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManager.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManager.cs
deleted file mode 100644
index 73d8bb6..0000000
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManager.cs
+++ /dev/null
@@ -1,286 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Collections.Generic;
-using System.Net;
-using Org.Apache.REEF.Tang.Interface;
-using Org.Apache.REEF.Utilities.Logging;
-using Org.Apache.REEF.Wake.Util;
-
-namespace Org.Apache.REEF.Wake.Remote.Impl
-{
- /// <summary>
- /// Manages incoming and outgoing messages between remote hosts.
- /// </summary>
- /// <typeparam name="T">Message type T. It is assumed to be IWritable</typeparam>
- [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)]
- public sealed class WritableRemoteManager<T> : IRemoteManager<T> where T : IWritable
- {
- private static readonly Logger LOGGER = Logger.GetLogger(typeof (WritableRemoteManager<T>));
-
- private readonly WritableObserverContainer<T> _observerContainer;
- private readonly WritableTransportServer<IWritableRemoteEvent<T>> _server;
- private readonly Dictionary<IPEndPoint, ProxyObserver> _cachedClients;
- private readonly IInjector _injector;
-
- /// <summary>
- /// Constructs a DefaultRemoteManager listening on the specified address and
- /// a specific port.
- /// </summary>
- /// <param name="localAddress">The address to listen on</param>
- /// <param name="port">The port to listen on</param>
- /// <param name="tcpPortProvider">Tcp port provider</param>
- /// <param name="injector">The injector to pass arguments to incoming messages</param>
- [Obsolete("Use IRemoteManagerFactory.GetInstance() instead.", false)]
- public WritableRemoteManager(IPAddress localAddress, int port, ITcpPortProvider tcpPortProvider, IInjector injector)
- {
- if (localAddress == null)
- {
- throw new ArgumentNullException("localAddress");
- }
- if (port < 0)
- {
- throw new ArgumentException("Listening port must be greater than or equal to zero");
- }
-
- _observerContainer = new WritableObserverContainer<T>();
- _cachedClients = new Dictionary<IPEndPoint, ProxyObserver>();
- _injector = injector;
-
- IPEndPoint localEndpoint = new IPEndPoint(localAddress, port);
-
- // Begin to listen for incoming messages
- _server = new WritableTransportServer<IWritableRemoteEvent<T>>(localEndpoint, _observerContainer, tcpPortProvider, injector);
- _server.Run();
-
- LocalEndpoint = _server.LocalEndpoint;
- Identifier = new SocketRemoteIdentifier(LocalEndpoint);
- }
-
- /// <summary>
- /// Constructs a DefaultRemoteManager. Does not listen for incoming messages.
- /// </summary>
- /// <param name="injector">The injector to pass arguments to incoming messages</param>
- [Obsolete("Use IRemoteManagerFactory.GetInstance() instead.", false)]
- public WritableRemoteManager(IInjector injector)
- {
- using (LOGGER.LogFunction("WritableRemoteManager::WritableRemoteManager"))
- {
- _observerContainer = new WritableObserverContainer<T>();
- _cachedClients = new Dictionary<IPEndPoint, ProxyObserver>();
- _injector = injector;
-
- LocalEndpoint = new IPEndPoint(NetworkUtils.LocalIPAddress, 0);
- Identifier = new SocketRemoteIdentifier(LocalEndpoint);
- }
- }
-
- /// <summary>
- /// Gets the RemoteIdentifier for the DefaultRemoteManager
- /// </summary>
- public IRemoteIdentifier Identifier { get; private set; }
-
- /// <summary>
- /// Gets the local IPEndPoint for the DefaultRemoteManager
- /// </summary>
- public IPEndPoint LocalEndpoint { get; private set; }
-
- /// <summary>
- /// Returns an IObserver used to send messages to the remote host at
- /// the specified IPEndpoint.
- /// </summary>
- /// <param name="remoteEndpoint">The IPEndpoint of the remote host</param>
- /// <returns>An IObserver used to send messages to the remote host</returns>
- public IObserver<T> GetRemoteObserver(RemoteEventEndPoint<T> remoteEndpoint)
- {
- if (remoteEndpoint == null)
- {
- throw new ArgumentNullException("remoteEndpoint");
- }
-
- SocketRemoteIdentifier id = remoteEndpoint.Id as SocketRemoteIdentifier;
- if (id == null)
- {
- throw new ArgumentException("ID not supported");
- }
-
- return GetRemoteObserver(id.Addr);
- }
-
- /// <summary>
- /// Returns an IObserver used to send messages to the remote host at
- /// the specified IPEndpoint.
- /// </summary>
- /// <param name="remoteEndpoint">The IPEndpoint of the remote host</param>
- /// <returns>An IObserver used to send messages to the remote host</returns>
- public IObserver<T> GetRemoteObserver(IPEndPoint remoteEndpoint)
- {
- if (remoteEndpoint == null)
- {
- throw new ArgumentNullException("remoteEndpoint");
- }
-
- ProxyObserver remoteObserver;
- if (!_cachedClients.TryGetValue(remoteEndpoint, out remoteObserver))
- {
- WritableTransportClient<IWritableRemoteEvent<T>> client =
- new WritableTransportClient<IWritableRemoteEvent<T>>(remoteEndpoint, _observerContainer, _injector);
-
- remoteObserver = new ProxyObserver(client);
- _cachedClients[remoteEndpoint] = remoteObserver;
- }
-
- return remoteObserver;
- }
-
- /// <summary>
- /// Registers an IObserver used to handle incoming messages from the remote host
- /// at the specified IPEndPoint.
- /// The IDisposable that is returned can be used to unregister the IObserver.
- /// </summary>
- /// <param name="remoteEndpoint">The IPEndPoint of the remote host</param>
- /// <param name="observer">The IObserver to handle incoming messages</param>
- /// <returns>An IDisposable used to unregister the observer with</returns>
- public IDisposable RegisterObserver(RemoteEventEndPoint<T> remoteEndpoint, IObserver<T> observer)
- {
- if (remoteEndpoint == null)
- {
- throw new ArgumentNullException("remoteEndpoint");
- }
-
- if (observer == null)
- {
- throw new ArgumentNullException("observer");
- }
-
- SocketRemoteIdentifier id = remoteEndpoint.Id as SocketRemoteIdentifier;
- if (id == null)
- {
- throw new ArgumentException("ID not supported");
- }
-
- return RegisterObserver(id.Addr, observer);
- }
-
- /// <summary>
- /// Registers an IObserver used to handle incoming messages from the remote host
- /// at the specified IPEndPoint.
- /// The IDisposable that is returned can be used to unregister the IObserver.
- /// </summary>
- /// <param name="remoteEndpoint">The IPEndPoint of the remote host</param>
- /// <param name="observer">The IObserver to handle incoming messages</param>
- /// <returns>An IDisposable used to unregister the observer with</returns>
- public IDisposable RegisterObserver(IPEndPoint remoteEndpoint, IObserver<T> observer)
- {
- if (remoteEndpoint == null)
- {
- throw new ArgumentNullException("remoteEndpoint");
- }
- if (observer == null)
- {
- throw new ArgumentNullException("observer");
- }
-
- return _observerContainer.RegisterObserver(remoteEndpoint, observer);
- }
-
- /// <summary>
- /// Registers an IObserver used to handle incoming messages from the remote host
- /// at the specified IPEndPoint.
- /// The IDisposable that is returned can be used to unregister the IObserver.
- /// </summary>
- /// <param name="observer">The IObserver to handle incoming messages</param>
- /// <returns>An IDisposable used to unregister the observer with</returns>
- public IDisposable RegisterObserver(IObserver<IRemoteMessage<T>> observer)
- {
- if (observer == null)
- {
- throw new ArgumentNullException("observer");
- }
-
- return _observerContainer.RegisterObserver(observer);
- }
-
- /// <summary>
- /// Release all resources for the DefaultRemoteManager.
- /// </summary>
- public void Dispose()
- {
- foreach (ProxyObserver cachedClient in _cachedClients.Values)
- {
- cachedClient.Dispose();
- }
-
- if (_server != null)
- {
- _server.Dispose();
- }
- }
-
- /// <summary>
- /// Observer to send messages to connected remote host
- /// </summary>
- private class ProxyObserver : IObserver<T>, IDisposable
- {
- private readonly WritableTransportClient<IWritableRemoteEvent<T>> _client;
-
- /// <summary>
- /// Create new ProxyObserver
- /// </summary>
- /// <param name="client">The connected WritableTransport client used to send
- /// messages to remote host</param>
- public ProxyObserver(WritableTransportClient<IWritableRemoteEvent<T>> client)
- {
- _client = client;
- }
-
- /// <summary>
- /// Send the message to the remote host
- /// </summary>
- /// <param name="message">The message to send</param>
- public void OnNext(T message)
- {
- IWritableRemoteEvent<T> remoteEvent = new WritableRemoteEvent<T>(_client.Link.LocalEndpoint,
- _client.Link.RemoteEndpoint,
- message);
-
- _client.Send(remoteEvent);
- }
-
- /// <summary>
- /// Close underlying WritableTransport client
- /// </summary>
- public void Dispose()
- {
- _client.Dispose();
- }
-
- public void OnError(Exception error)
- {
- throw error;
- }
-
- public void OnCompleted()
- {
- throw new NotImplementedException();
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManagerFactory.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManagerFactory.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManagerFactory.cs
deleted file mode 100644
index 52fef8d..0000000
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManagerFactory.cs
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Net;
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Tang.Interface;
-
-namespace Org.Apache.REEF.Wake.Remote.Impl
-{
- /// <summary>
- /// WritableRemoteManagerFactory for WritableRemoteManager.
- /// </summary>
- [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)]
- public sealed class WritableRemoteManagerFactory
- {
- private readonly ITcpPortProvider _tcpPortProvider;
- private readonly IInjector _injector;
-
- [Inject]
- private WritableRemoteManagerFactory(ITcpPortProvider tcpPortProvider, IInjector injector)
- {
- _tcpPortProvider = tcpPortProvider;
- _injector = injector;
- }
-
- public IRemoteManager<T> GetInstance<T>(IPAddress localAddress, int port) where T : IWritable
- {
-#pragma warning disable 618
-// This is the one place allowed to call this constructor. Hence, disabling the warning is OK.
- return new WritableRemoteManager<T>(localAddress, port, _tcpPortProvider, _injector);
-#pragma warning disable 618
- }
-
- public IRemoteManager<T> GetInstance<T>() where T : IWritable
- {
-#pragma warning disable 618
-// This is the one place allowed to call this constructor. Hence, disabling the warning is OK.
- return new WritableRemoteManager<T>(_injector);
-#pragma warning disable 618
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportClient.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportClient.cs
deleted file mode 100644
index b245f0f..0000000
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportClient.cs
+++ /dev/null
@@ -1,132 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Net;
-using System.Threading;
-using System.Threading.Tasks;
-using Org.Apache.REEF.Tang.Interface;
-using Org.Apache.REEF.Utilities.Diagnostics;
-using Org.Apache.REEF.Utilities.Logging;
-
-namespace Org.Apache.REEF.Wake.Remote.Impl
-{
- /// <summary>
- /// Establish connections to TransportServer for remote message passing
- /// </summary>
- /// <typeparam name="T">Generic Type of message. It is constrained to have implemented IWritable and IType interface</typeparam>
- [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)]
- public class WritableTransportClient<T> : IDisposable where T : IWritable
- {
- private readonly ILink<T> _link;
- private readonly IObserver<TransportEvent<T>> _observer;
- private readonly CancellationTokenSource _cancellationSource;
- private bool _disposed;
- private static readonly Logger Logger = Logger.GetLogger(typeof(WritableTransportClient<T>));
-
- /// <summary>
- /// Construct a TransportClient.
- /// Used to send messages to the specified remote endpoint.
- /// </summary>
- /// <param name="remoteEndpoint">The endpoint of the remote server to connect to</param>
- /// <param name="injector">The injector to pass arguments to incoming messages</param>
- public WritableTransportClient(IPEndPoint remoteEndpoint, IInjector injector)
- {
- Exceptions.ThrowIfArgumentNull(remoteEndpoint, "remoteEndpoint", Logger);
-
- _link = new WritableLink<T>(remoteEndpoint, injector);
- _cancellationSource = new CancellationTokenSource();
- _disposed = false;
- }
-
- /// <summary>
- /// Construct a TransportClient.
- /// Used to send messages to the specified remote endpoint.
- /// </summary>
- /// <param name="remoteEndpoint">The endpoint of the remote server to connect to</param>
- /// <param name="observer">Callback used when receiving responses from remote host</param>
- /// <param name="injector">The injector to pass arguments to incoming messages</param>
- public WritableTransportClient(IPEndPoint remoteEndpoint,
- IObserver<TransportEvent<T>> observer,
- IInjector injector)
- : this(remoteEndpoint, injector)
- {
- _observer = observer;
- Task.Run(() => ResponseLoop());
- }
-
- /// <summary>
- /// Gets the underlying transport link.
- /// </summary>
- public ILink<T> Link
- {
- get { return _link; }
- }
-
- /// <summary>
- /// Send the remote message.
- /// </summary>
- /// <param name="message">The message to send</param>
- public void Send(T message)
- {
- if (message == null)
- {
- throw new ArgumentNullException("message");
- }
-
- _link.Write(message);
- }
-
- /// <summary>
- /// Close all opened connections
- /// </summary>
- public void Dispose()
- {
- Dispose(true);
- GC.SuppressFinalize(this);
- }
-
- protected void Dispose(bool disposing)
- {
- if (!_disposed && disposing)
- {
- _link.Dispose();
- _disposed = true;
- }
- }
-
- /// <summary>
- /// Continually read responses from remote host
- /// </summary>
- private async Task ResponseLoop()
- {
- while (!_cancellationSource.IsCancellationRequested)
- {
- T message = await _link.ReadAsync(_cancellationSource.Token);
- if (message == null)
- {
- break;
- }
-
- TransportEvent<T> transportEvent = new TransportEvent<T>(message, _link);
- _observer.OnNext(transportEvent);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportServer.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportServer.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportServer.cs
deleted file mode 100644
index 6b5961f..0000000
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportServer.cs
+++ /dev/null
@@ -1,244 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Collections.Generic;
-using System.IO;
-using System.Net;
-using System.Net.Sockets;
-using System.Threading;
-using System.Threading.Tasks;
-using Org.Apache.REEF.Tang.Interface;
-using Org.Apache.REEF.Utilities.Diagnostics;
-using Org.Apache.REEF.Utilities.Logging;
-using Org.Apache.REEF.Wake.Util;
-
-namespace Org.Apache.REEF.Wake.Remote.Impl
-{
- /// <summary>
- /// Server to handle incoming remote messages.
- /// </summary>
- /// <typeparam name="T">Generic Type of message. It is constrained to have implemented IWritable and IType interface</typeparam>
- [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)]
- public class WritableTransportServer<T> : IDisposable where T : IWritable
- {
- private static readonly Logger LOGGER = Logger.GetLogger(typeof (TransportServer<>));
-
- private TcpListener _listener;
- private readonly CancellationTokenSource _cancellationSource;
- private readonly IObserver<TransportEvent<T>> _remoteObserver;
- private readonly ITcpPortProvider _tcpPortProvider;
- private readonly IInjector _injector;
- private bool _disposed;
- private Task _serverTask;
- /// <summary>
- /// Constructs a TransportServer to listen for remote events.
- /// Listens on the specified remote endpoint. When it recieves a remote
- /// event, it will envoke the specified remote handler.
- /// </summary>
- /// <param name="port">Port to listen on</param>
- /// <param name="remoteHandler">The handler to invoke when receiving incoming
- /// remote messages</param>
- /// <param name="tcpPortProvider">Find port numbers if listenport is 0</param>
- /// <param name="injector">The injector to pass arguments to incoming messages</param>
- public WritableTransportServer(int port, IObserver<TransportEvent<T>> remoteHandler, ITcpPortProvider tcpPortProvider, IInjector injector)
- : this(new IPEndPoint(NetworkUtils.LocalIPAddress, port), remoteHandler, tcpPortProvider, injector)
- {
- }
-
- /// <summary>
- /// Constructs a TransportServer to listen for remote events.
- /// Listens on the specified remote endpoint. When it recieves a remote
- /// event, it will envoke the specified remote handler.
- /// </summary>
- /// <param name="localEndpoint">Endpoint to listen on</param>
- /// <param name="remoteHandler">The handler to invoke when receiving incoming
- /// remote messages</param>
- /// <param name="tcpPortProvider">Find port numbers if listenport is 0</param>
- /// <param name="injector">The injector to pass arguments to incoming messages</param>
- public WritableTransportServer(
- IPEndPoint localEndpoint,
- IObserver<TransportEvent<T>> remoteHandler,
- ITcpPortProvider tcpPortProvider,
- IInjector injector)
- {
- _listener = new TcpListener(localEndpoint.Address, localEndpoint.Port);
- _remoteObserver = remoteHandler;
- _tcpPortProvider = tcpPortProvider;
- _cancellationSource = new CancellationTokenSource();
- _cancellationSource.Token.ThrowIfCancellationRequested();
- _injector = injector;
- _disposed = false;
- }
-
- /// <summary>
- /// Returns the listening endpoint for the TransportServer
- /// </summary>
- public IPEndPoint LocalEndpoint
- {
- get { return _listener.LocalEndpoint as IPEndPoint; }
- }
-
- /// <summary>
- /// Starts listening for incoming remote messages.
- /// </summary>
- public void Run()
- {
- if (LocalEndpoint.Port == 0)
- {
- FindAPortAndStartListener();
- }
- else
- {
- _listener.Start();
- }
-
- _serverTask = Task.Run(() => StartServer());
- }
-
- private void FindAPortAndStartListener()
- {
- var foundAPort = false;
- var exception = new SocketException((int)SocketError.AddressAlreadyInUse);
- for (var enumerator = _tcpPortProvider.GetEnumerator();
- !foundAPort && enumerator.MoveNext();
- )
- {
- _listener = new TcpListener(LocalEndpoint.Address, enumerator.Current);
- try
- {
- _listener.Start();
- foundAPort = true;
- }
- catch (SocketException e)
- {
- exception = e;
- }
- }
- if (!foundAPort)
- {
- Exceptions.Throw(exception, "Could not find a port to listen on", LOGGER);
- }
- LOGGER.Log(Level.Info,
- String.Format("Listening on {0}", _listener.LocalEndpoint.ToString()));
- }
-
-
- /// <summary>
- /// Close the TransportServer and all open connections
- /// </summary>
- public void Dispose()
- {
- Dispose(true);
- GC.SuppressFinalize(this);
- }
-
- public void Dispose(bool disposing)
- {
- if (!_disposed && disposing)
- {
- _cancellationSource.Cancel();
-
- try
- {
- _listener.Stop();
- }
- catch (SocketException)
- {
- LOGGER.Log(Level.Info, "Disposing of transport server before listener is created.");
- }
-
- if (_serverTask != null)
- {
- _serverTask.Wait();
-
- // Give the TransportServer Task 500ms to shut down, ignore any timeout errors
- try
- {
- CancellationTokenSource serverDisposeTimeout = new CancellationTokenSource(500);
- _serverTask.Wait(serverDisposeTimeout.Token);
- }
- catch (Exception e)
- {
- Console.Error.WriteLine(e);
- }
- finally
- {
- _serverTask.Dispose();
- }
- }
- }
-
- _disposed = true;
- }
-
- /// <summary>
- /// Helper method to start TransportServer. This will
- /// be run in an asynchronous Task.
- /// </summary>
- /// <returns>An asynchronous Task for the running server.</returns>
- private async Task StartServer()
- {
- try
- {
- while (!_cancellationSource.Token.IsCancellationRequested)
- {
- TcpClient client = await _listener.AcceptTcpClientAsync().ConfigureAwait(false);
- ProcessClient(client).Forget();
- }
- }
- catch (InvalidOperationException)
- {
- LOGGER.Log(Level.Info, "TransportServer has been closed.");
- }
- catch (OperationCanceledException)
- {
- LOGGER.Log(Level.Info, "TransportServer has been closed.");
- }
- }
-
- /// <summary>
- /// Recieves event from connected TcpClient and invokes handler on the event.
- /// </summary>
- /// <param name="client">The connected client</param>
- private async Task ProcessClient(TcpClient client)
- {
-
- // Keep reading messages from client until they disconnect or timeout
- CancellationToken token = _cancellationSource.Token;
- using (ILink<T> link = new WritableLink<T>(client, _injector))
- {
- while (!token.IsCancellationRequested)
- {
- T message = await link.ReadAsync(token);
-
- if (message == null)
- {
- break;
- }
-
- TransportEvent<T> transportEvent = new TransportEvent<T>(message, link);
- _remoteObserver.OnNext(transportEvent);
- }
- LOGGER.Log(Level.Error,
- "ProcessClient close the Link. IsCancellationRequested: " + token.IsCancellationRequested);
- }
- }
- }
-}
\ No newline at end of file
[2/2] incubator-reef git commit: [REEF-422] Convert Wake Layer from
Writable to Streaming
Posted by we...@apache.org.
[REEF-422] Convert Wake Layer from Writable to Streaming
This addressed the issue by
* using `StreamingCodec` in WritableLink instead of `Writable` by
reflection.
* Removing Writable constraint on generic in `WritableLink`,
`WritableTransportServer`, `WritableTransportClient`,
`WritableRemoteManager`
* Introduce `TemporaryWritableToStreamingCodec` so that Network
Service layer can still use `Writable` interface.
`StreamingCodec` is assumed to have at most static argument parameters.
That is, it cannot have parameters that change from one message to
another.
JIRA:
[REEF-422](https://issues.apache.org/jira/browse/REEF-422)
Pull Request:
This closes #261
Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/ba2653d6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/ba2653d6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/ba2653d6
Branch: refs/heads/master
Commit: ba2653d6bfac3aabaf640474efbdfcb18c87baf8
Parents: ec9b497
Author: Dhruv <dh...@gmail.com>
Authored: Tue Jun 30 11:01:06 2015 -0700
Committer: Markus Weimer <we...@apache.org>
Committed: Tue Jul 7 13:56:04 2015 -0700
----------------------------------------------------------------------
.../NetworkService/WritableNetworkService.cs | 2 +-
.../Org.Apache.REEF.Wake.Tests.csproj | 5 +-
.../PrefixedStringWritable.cs | 108 -----
.../StreamingRemoteManagerTest.cs | 340 +++++++++++++++
.../StreamingTransportTest.cs | 177 ++++++++
.../WritableRemoteManagerTest.cs | 437 -------------------
.../WritableTransportTest.cs | 223 ----------
.../Org.Apache.REEF.Wake.csproj | 15 +-
.../Remote/IWritableRemoteEvent.cs | 45 --
.../Remote/Impl/RemoteEvent.cs | 4 +
.../Remote/Impl/RemoteEventStreamingCodec.cs | 82 ++++
.../Remote/Impl/StreamingLink.cs | 258 +++++++++++
.../Remote/Impl/StreamingRemoteManager.cs | 256 +++++++++++
.../Impl/StreamingRemoteManagerFactory.cs | 51 +++
.../Remote/Impl/StreamingTransportClient.cs | 125 ++++++
.../Remote/Impl/StreamingTransportServer.cs | 212 +++++++++
.../Impl/TemporaryWritableToStreamingCodec.cs | 70 +++
.../Remote/Impl/WritableLink.cs | 295 -------------
.../Remote/Impl/WritableObserverContainer.cs | 132 ------
.../Remote/Impl/WritableRemoteEvent.cs | 115 -----
.../Remote/Impl/WritableRemoteManager.cs | 286 ------------
.../Remote/Impl/WritableRemoteManagerFactory.cs | 59 ---
.../Remote/Impl/WritableTransportClient.cs | 132 ------
.../Remote/Impl/WritableTransportServer.cs | 244 -----------
24 files changed, 1585 insertions(+), 2088 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNetworkService.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNetworkService.cs b/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNetworkService.cs
index f383697..7d9d015 100644
--- a/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNetworkService.cs
+++ b/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNetworkService.cs
@@ -62,7 +62,7 @@ namespace Org.Apache.REEF.Network.NetworkService
IObserver<WritableNsMessage<T>> messageHandler,
IIdentifierFactory idFactory,
INameClient nameClient,
- WritableRemoteManagerFactory remoteManagerFactory)
+ StreamingRemoteManagerFactory remoteManagerFactory)
{
IPAddress localAddress = NetworkUtils.LocalIPAddress;
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj b/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj
index 581508b..babc26d 100644
--- a/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj
@@ -46,12 +46,11 @@ under the License.
<ItemGroup>
<Compile Include="ClockTest.cs" />
<Compile Include="MultiCodecTest.cs" />
- <Compile Include="PrefixedStringWritable.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="PubSubSubjectTest.cs" />
<Compile Include="RemoteManagerTest.cs" />
- <Compile Include="WritableRemoteManagerTest.cs" />
- <Compile Include="WritableTransportTest.cs" />
+ <Compile Include="StreamingRemoteManagerTest.cs" />
+ <Compile Include="StreamingTransportTest.cs" />
<Compile Include="TransportTest.cs" />
<Compile Include="WritableString.cs" />
</ItemGroup>
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake.Tests/PrefixedStringWritable.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/PrefixedStringWritable.cs b/lang/cs/Org.Apache.REEF.Wake.Tests/PrefixedStringWritable.cs
deleted file mode 100644
index dbb8af3..0000000
--- a/lang/cs/Org.Apache.REEF.Wake.Tests/PrefixedStringWritable.cs
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Threading;
-using System.Threading.Tasks;
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Wake.Remote;
-
-namespace Org.Apache.REEF.Wake.Tests
-{
- [NamedParameter("identifier in PrefixedWritable")]
- public class StringId : Name<int>
- {
- }
-
- /// <summary>
- /// Writable wrapper around the string class which takes integer prefix
- /// This class is used to test non empty injector in TransportServer and Client
- /// </summary>
- [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)]
- public class PrefixedStringWritable : IWritable
- {
- private readonly int _id;
- private string _data;
-
- /// <summary>
- /// Returns the actual string data
- /// </summary>
- public string Data
- {
- get { return _data + "_" + _id; }
- set { _data = value; }
- }
-
- /// <summary>
- /// Empty constructor for instantiation with reflection
- /// </summary>
- [Inject]
- public PrefixedStringWritable([Parameter(typeof(StringId))] int id)
- {
- _id = id;
- }
-
- /// <summary>
- /// Constructor
- /// </summary>
- /// <param name="data">The string data</param>
- public PrefixedStringWritable(string data)
- {
- _data = data;
- }
-
- /// <summary>
- /// Reads the string
- /// </summary>
- /// <param name="reader">reader to read from</param>
- public void Read(IDataReader reader)
- {
- _data = reader.ReadString();
- }
-
- /// <summary>
- /// Writes the string
- /// </summary>
- /// <param name="writer">Writer to write</param>
- public void Write(IDataWriter writer)
- {
- writer.WriteString(_data);
- }
-
- /// <summary>
- /// Reads the string
- /// </summary>
- /// <param name="reader">reader to read from</param>
- /// <param name="token">the cancellation token</param>
- public async Task ReadAsync(IDataReader reader, CancellationToken token)
- {
- _data = await reader.ReadStringAsync(token);
- }
-
- /// <summary>
- /// Writes the string
- /// </summary>
- /// <param name="writer">Writer to write</param>
- /// <param name="token">the cancellation token</param>
- public async Task WriteAsync(IDataWriter writer, CancellationToken token)
- {
- await writer.WriteStringAsync(_data, token);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake.Tests/StreamingRemoteManagerTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/StreamingRemoteManagerTest.cs b/lang/cs/Org.Apache.REEF.Wake.Tests/StreamingRemoteManagerTest.cs
new file mode 100644
index 0000000..20f75be
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/StreamingRemoteManagerTest.cs
@@ -0,0 +1,340 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Net;
+using System.Reactive;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Wake.Remote.Impl;
+using Org.Apache.REEF.Wake.Util;
+
+namespace Org.Apache.REEF.Wake.Tests
+{
+ [TestClass]
+ public class StreamingRemoteManagerTest
+ {
+ private readonly StreamingRemoteManagerFactory _remoteManagerFactory1 =
+ TangFactory.GetTang().NewInjector().GetInstance<StreamingRemoteManagerFactory>();
+
+ private readonly StreamingRemoteManagerFactory _remoteManagerFactory2 =
+ TangFactory.GetTang().NewInjector().GetInstance<StreamingRemoteManagerFactory>();
+
+ /// <summary>
+ /// Tests one way communication between Remote Managers
+ /// Remote Manager listens on any available port
+ /// </summary>
+ [TestMethod]
+ public void TestStreamingOneWayCommunication()
+ {
+ IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
+
+ BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>();
+ List<string> events = new List<string>();
+
+ using (var remoteManager1 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+ using (var remoteManager2 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+ {
+ var observer = Observer.Create<WritableString>(queue.Add);
+ IPEndPoint endpoint1 = new IPEndPoint(listeningAddress, 0);
+ remoteManager2.RegisterObserver(endpoint1, observer);
+
+ var remoteObserver = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
+ remoteObserver.OnNext(new WritableString("abc"));
+ remoteObserver.OnNext(new WritableString("def"));
+ remoteObserver.OnNext(new WritableString("ghi"));
+
+ events.Add(queue.Take().Data);
+ events.Add(queue.Take().Data);
+ events.Add(queue.Take().Data);
+ }
+
+ Assert.AreEqual(3, events.Count);
+ }
+
+ /// <summary>
+ /// Tests two way communications. Checks whether both sides are able to receive messages
+ /// </summary>
+ [TestMethod]
+ public void TestStreamingTwoWayCommunication()
+ {
+ IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
+
+ BlockingCollection<WritableString> queue1 = new BlockingCollection<WritableString>();
+ BlockingCollection<WritableString> queue2 = new BlockingCollection<WritableString>();
+ List<string> events1 = new List<string>();
+ List<string> events2 = new List<string>();
+
+ using (var remoteManager1 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+ using (var remoteManager2 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+ {
+ // Register observers for remote manager 1 and remote manager 2
+ var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
+ var observer1 = Observer.Create<WritableString>(queue1.Add);
+ var observer2 = Observer.Create<WritableString>(queue2.Add);
+ remoteManager1.RegisterObserver(remoteEndpoint, observer1);
+ remoteManager2.RegisterObserver(remoteEndpoint, observer2);
+
+ // Remote manager 1 sends 3 events to remote manager 2
+ var remoteObserver1 = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
+ remoteObserver1.OnNext(new WritableString("abc"));
+ remoteObserver1.OnNext(new WritableString("def"));
+ remoteObserver1.OnNext(new WritableString("ghi"));
+
+ // Remote manager 2 sends 4 events to remote manager 1
+ var remoteObserver2 = remoteManager2.GetRemoteObserver(remoteManager1.LocalEndpoint);
+ remoteObserver2.OnNext(new WritableString("jkl"));
+ remoteObserver2.OnNext(new WritableString("mno"));
+ remoteObserver2.OnNext(new WritableString("pqr"));
+ remoteObserver2.OnNext(new WritableString("stu"));
+
+ events1.Add(queue1.Take().Data);
+ events1.Add(queue1.Take().Data);
+ events1.Add(queue1.Take().Data);
+ events1.Add(queue1.Take().Data);
+
+ events2.Add(queue2.Take().Data);
+ events2.Add(queue2.Take().Data);
+ events2.Add(queue2.Take().Data);
+ }
+
+ Assert.AreEqual(4, events1.Count);
+ Assert.AreEqual(3, events2.Count);
+ }
+
+ /// <summary>
+ /// Tests one way communication between 3 nodes.
+ /// nodes 1 and 2 send messages to node 3
+ /// </summary>
+ [TestMethod]
+ public void TestStreamingCommunicationThreeNodesOneWay()
+ {
+ IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
+
+ BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>();
+ List<string> events = new List<string>();
+
+ using (var remoteManager1 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+ using (var remoteManager2 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+ using (var remoteManager3 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+ {
+ var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
+ var observer = Observer.Create<WritableString>(queue.Add);
+ remoteManager3.RegisterObserver(remoteEndpoint, observer);
+
+ var remoteObserver1 = remoteManager1.GetRemoteObserver(remoteManager3.LocalEndpoint);
+ var remoteObserver2 = remoteManager2.GetRemoteObserver(remoteManager3.LocalEndpoint);
+
+ remoteObserver2.OnNext(new WritableString("abc"));
+ remoteObserver1.OnNext(new WritableString("def"));
+ remoteObserver2.OnNext(new WritableString("ghi"));
+ remoteObserver1.OnNext(new WritableString("jkl"));
+ remoteObserver2.OnNext(new WritableString("mno"));
+
+ for (int i = 0; i < 5; i++)
+ {
+ events.Add(queue.Take().Data);
+ }
+ }
+
+ Assert.AreEqual(5, events.Count);
+ }
+
+ /// <summary>
+ /// Tests one way communication between 3 nodes.
+ /// nodes 1 and 2 send messages to node 3 and node 3 sends message back
+ /// </summary>
+ [TestMethod]
+ public void TestStreamingCommunicationThreeNodesBothWays()
+ {
+ IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
+
+ BlockingCollection<WritableString> queue1 = new BlockingCollection<WritableString>();
+ BlockingCollection<WritableString> queue2 = new BlockingCollection<WritableString>();
+ BlockingCollection<WritableString> queue3 = new BlockingCollection<WritableString>();
+ List<string> events1 = new List<string>();
+ List<string> events2 = new List<string>();
+ List<string> events3 = new List<string>();
+
+ using (var remoteManager1 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+ using (var remoteManager2 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+ using (var remoteManager3 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+ {
+ var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
+
+ var observer = Observer.Create<WritableString>(queue1.Add);
+ remoteManager1.RegisterObserver(remoteEndpoint, observer);
+ var observer2 = Observer.Create<WritableString>(queue2.Add);
+ remoteManager2.RegisterObserver(remoteEndpoint, observer2);
+ var observer3 = Observer.Create<WritableString>(queue3.Add);
+ remoteManager3.RegisterObserver(remoteEndpoint, observer3);
+
+ var remoteObserver1 = remoteManager1.GetRemoteObserver(remoteManager3.LocalEndpoint);
+ var remoteObserver2 = remoteManager2.GetRemoteObserver(remoteManager3.LocalEndpoint);
+
+ // Observer 1 and 2 send messages to observer 3
+ remoteObserver1.OnNext(new WritableString("abc"));
+ remoteObserver1.OnNext(new WritableString("abc"));
+ remoteObserver1.OnNext(new WritableString("abc"));
+ remoteObserver2.OnNext(new WritableString("def"));
+ remoteObserver2.OnNext(new WritableString("def"));
+
+ // Observer 3 sends messages back to observers 1 and 2
+ var remoteObserver3A = remoteManager3.GetRemoteObserver(remoteManager1.LocalEndpoint);
+ var remoteObserver3B = remoteManager3.GetRemoteObserver(remoteManager2.LocalEndpoint);
+
+ remoteObserver3A.OnNext(new WritableString("ghi"));
+ remoteObserver3A.OnNext(new WritableString("ghi"));
+ remoteObserver3B.OnNext(new WritableString("jkl"));
+ remoteObserver3B.OnNext(new WritableString("jkl"));
+ remoteObserver3B.OnNext(new WritableString("jkl"));
+
+ events1.Add(queue1.Take().Data);
+ events1.Add(queue1.Take().Data);
+
+ events2.Add(queue2.Take().Data);
+ events2.Add(queue2.Take().Data);
+ events2.Add(queue2.Take().Data);
+
+ events3.Add(queue3.Take().Data);
+ events3.Add(queue3.Take().Data);
+ events3.Add(queue3.Take().Data);
+ events3.Add(queue3.Take().Data);
+ events3.Add(queue3.Take().Data);
+ }
+
+ Assert.AreEqual(2, events1.Count);
+ Assert.AreEqual(3, events2.Count);
+ Assert.AreEqual(5, events3.Count);
+ }
+
+ /// <summary>
+ /// Tests whether remote manager is able to send acknowledgement back
+ /// </summary>
+ [TestMethod]
+ public void TestStreamingRemoteSenderCallback()
+ {
+ IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
+
+ BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>();
+ List<string> events = new List<string>();
+
+ using (var remoteManager1 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+ using (var remoteManager2 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+ {
+ // Register handler for when remote manager 2 receives events; respond
+ // with an ack
+ var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
+ var remoteObserver2 = remoteManager2.GetRemoteObserver(remoteManager1.LocalEndpoint);
+
+ var receiverObserver = Observer.Create<WritableString>(
+ message => remoteObserver2.OnNext(new WritableString("received message: " + message.Data)));
+ remoteManager2.RegisterObserver(remoteEndpoint, receiverObserver);
+
+ // Register handler for remote manager 1 to record the ack
+ var senderObserver = Observer.Create<WritableString>(queue.Add);
+ remoteManager1.RegisterObserver(remoteEndpoint, senderObserver);
+
+ // Begin to send messages
+ var remoteObserver1 = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
+ remoteObserver1.OnNext(new WritableString("hello"));
+ remoteObserver1.OnNext(new WritableString("there"));
+ remoteObserver1.OnNext(new WritableString("buddy"));
+
+ events.Add(queue.Take().Data);
+ events.Add(queue.Take().Data);
+ events.Add(queue.Take().Data);
+ }
+
+ Assert.AreEqual(3, events.Count);
+ Assert.AreEqual("received message: hello", events[0]);
+ Assert.AreEqual("received message: there", events[1]);
+ Assert.AreEqual("received message: buddy", events[2]);
+ }
+
+ /// <summary>
+ /// Test whether observer can be created with IRemoteMessage interface
+ /// </summary>
+ [TestMethod]
+ public void TestStreamingRegisterObserverByType()
+ {
+ IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
+
+ BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>();
+ List<string> events = new List<string>();
+
+ using (var remoteManager1 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+ using (var remoteManager2 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+ {
+ // RemoteManager2 listens and records events of type IRemoteEvent<WritableString>
+ var observer = Observer.Create<IRemoteMessage<WritableString>>(message => queue.Add(message.Message));
+ remoteManager2.RegisterObserver(observer);
+
+ // Remote manager 1 sends 3 events to remote manager 2
+ var remoteObserver = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
+ remoteObserver.OnNext(new WritableString("abc"));
+ remoteObserver.OnNext(new WritableString("def"));
+ remoteObserver.OnNext(new WritableString("ghi"));
+
+ events.Add(queue.Take().Data);
+ events.Add(queue.Take().Data);
+ events.Add(queue.Take().Data);
+ }
+
+ Assert.AreEqual(3, events.Count);
+ }
+
+ /// <summary>
+ /// Tests whether we get the cached observer back for sending message without reinstantiating it
+ /// </summary>
+ [TestMethod]
+ public void TestStreamingCachedConnection()
+ {
+ IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
+
+ BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>();
+ List<string> events = new List<string>();
+
+ using (var remoteManager1 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+ using (var remoteManager2 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+ {
+ var observer = Observer.Create<WritableString>(queue.Add);
+ IPEndPoint endpoint1 = new IPEndPoint(listeningAddress, 0);
+ remoteManager2.RegisterObserver(endpoint1, observer);
+
+ var remoteObserver = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
+ remoteObserver.OnNext(new WritableString("abc"));
+ remoteObserver.OnNext(new WritableString("def"));
+
+ var cachedObserver = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
+ cachedObserver.OnNext(new WritableString("ghi"));
+ cachedObserver.OnNext(new WritableString("jkl"));
+
+ events.Add(queue.Take().Data);
+ events.Add(queue.Take().Data);
+ events.Add(queue.Take().Data);
+ events.Add(queue.Take().Data);
+ }
+
+ Assert.AreEqual(4, events.Count);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake.Tests/StreamingTransportTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/StreamingTransportTest.cs b/lang/cs/Org.Apache.REEF.Wake.Tests/StreamingTransportTest.cs
new file mode 100644
index 0000000..268da70
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/StreamingTransportTest.cs
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Net;
+using System.Reactive;
+using System.Threading.Tasks;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Wake.Remote.Impl;
+using Org.Apache.REEF.Wake.Remote.Parameters;
+using Org.Apache.REEF.Wake.StreamingCodec;
+using Org.Apache.REEF.Wake.StreamingCodec.CommonStreamingCodecs;
+
+namespace Org.Apache.REEF.Wake.Tests
+{
+ /// <summary>
+ /// Tests the StreamingTransportServer, StreamingTransportClient and StreamingLink.
+ /// Basically the Wake transport layer.
+ /// </summary>
+ [TestClass]
+ public class StreamingTransportTest
+ {
+ private readonly ITcpPortProvider _tcpPortProvider = GetTcpProvider(8900, 8940);
+ private readonly IInjector _injector = TangFactory.GetTang().NewInjector();
+
+ /// <summary>
+ /// Tests whether StreamingTransportServer receives
+ /// string messages from StreamingTransportClient
+ /// </summary>
+ [TestMethod]
+ public void TestStreamingTransportServer()
+ {
+ BlockingCollection<string> queue = new BlockingCollection<string>();
+ List<string> events = new List<string>();
+ IStreamingCodec<string> stringCodec = _injector.GetInstance<StringStreamingCodec>();
+
+ IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, 0);
+ var remoteHandler = Observer.Create<TransportEvent<string>>(tEvent => queue.Add(tEvent.Data));
+
+ using (var server = new StreamingTransportServer<string>(endpoint.Address, remoteHandler, _tcpPortProvider, stringCodec))
+ {
+ server.Run();
+
+ IPEndPoint remoteEndpoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), server.LocalEndpoint.Port);
+ using (var client = new StreamingTransportClient<string>(remoteEndpoint, stringCodec))
+ {
+ client.Send("Hello");
+ client.Send(", ");
+ client.Send("World!");
+
+ events.Add(queue.Take());
+ events.Add(queue.Take());
+ events.Add(queue.Take());
+ }
+ }
+
+ Assert.AreEqual(3, events.Count);
+ Assert.AreEqual(events[0], "Hello");
+ Assert.AreEqual(events[1], ", ");
+ Assert.AreEqual(events[2], "World!");
+ }
+
+ /// <summary>
+ /// Checks whether StreamingTransportClient is able to receive messages from remote host
+ /// </summary>
+ [TestMethod]
+ public void TestStreamingTransportSenderStage()
+ {
+
+ IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, 0);
+
+ List<string> events = new List<string>();
+ BlockingCollection<string> queue = new BlockingCollection<string>();
+ IStreamingCodec<string> stringCodec = _injector.GetInstance<StringStreamingCodec>();
+
+ // Server echoes the message back to the client
+ var remoteHandler = Observer.Create<TransportEvent<string>>(tEvent => tEvent.Link.Write(tEvent.Data));
+
+ using (var server = new StreamingTransportServer<string>(endpoint.Address, remoteHandler, _tcpPortProvider, stringCodec))
+ {
+ server.Run();
+
+ var clientHandler = Observer.Create<TransportEvent<string>>(tEvent => queue.Add(tEvent.Data));
+ IPEndPoint remoteEndpoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), server.LocalEndpoint.Port);
+ using (var client = new StreamingTransportClient<string>(remoteEndpoint, clientHandler, stringCodec))
+ {
+ client.Send("Hello");
+ client.Send(", ");
+ client.Send(" World");
+
+ events.Add(queue.Take());
+ events.Add(queue.Take());
+ events.Add(queue.Take());
+ }
+ }
+
+ Assert.AreEqual(3, events.Count);
+ Assert.AreEqual(events[0], "Hello");
+ Assert.AreEqual(events[1], ", ");
+ Assert.AreEqual(events[2], " World");
+ }
+
+ /// <summary>
+ /// Checks whether StreamingTransportClient and StreamingTransportServer works
+ /// in asynchronous condition while sending messages asynchronously from different
+ /// threads
+ /// </summary>
+ [TestMethod]
+ public void TestStreamingRaceCondition()
+ {
+ BlockingCollection<string> queue = new BlockingCollection<string>();
+ List<string> events = new List<string>();
+ IStreamingCodec<string> stringCodec = _injector.GetInstance<StringStreamingCodec>();
+ int numEventsExpected = 150;
+
+ IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, 0);
+ var remoteHandler = Observer.Create<TransportEvent<string>>(tEvent => queue.Add(tEvent.Data));
+
+ using (var server = new StreamingTransportServer<string>(endpoint.Address, remoteHandler, _tcpPortProvider, stringCodec))
+ {
+ server.Run();
+
+ for (int i = 0; i < numEventsExpected / 3; i++)
+ {
+ Task.Run(() =>
+ {
+ IPEndPoint remoteEndpoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), server.LocalEndpoint.Port);
+ using (var client = new StreamingTransportClient<string>(remoteEndpoint, stringCodec))
+ {
+ client.Send("Hello");
+ client.Send(", ");
+ client.Send("World!");
+ }
+ });
+ }
+
+ for (int i = 0; i < numEventsExpected; i++)
+ {
+ events.Add(queue.Take());
+ }
+ }
+
+ Assert.AreEqual(numEventsExpected, events.Count);
+
+ }
+
+ private static ITcpPortProvider GetTcpProvider(int portRangeStart, int portRangeEnd)
+ {
+ var configuration = TangFactory.GetTang().NewConfigurationBuilder()
+ .BindImplementation<ITcpPortProvider, TcpPortProvider>()
+ .BindIntNamedParam<TcpPortRangeStart>(portRangeStart.ToString())
+ .BindIntNamedParam<TcpPortRangeCount>((portRangeEnd - portRangeStart + 1).ToString())
+ .Build();
+ return TangFactory.GetTang().NewInjector(configuration).GetInstance<ITcpPortProvider>();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake.Tests/WritableRemoteManagerTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/WritableRemoteManagerTest.cs b/lang/cs/Org.Apache.REEF.Wake.Tests/WritableRemoteManagerTest.cs
deleted file mode 100644
index 49c0f5b..0000000
--- a/lang/cs/Org.Apache.REEF.Wake.Tests/WritableRemoteManagerTest.cs
+++ /dev/null
@@ -1,437 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Collections.Concurrent;
-using System.Collections.Generic;
-using System.Globalization;
-using System.IO;
-using System.Net;
-using System.Reactive;
-using System.Threading.Tasks;
-using Microsoft.VisualStudio.TestTools.UnitTesting;
-using Org.Apache.REEF.Tang.Implementations.Tang;
-using Org.Apache.REEF.Tang.Interface;
-using Org.Apache.REEF.Tang.Util;
-using Org.Apache.REEF.Wake.Impl;
-using Org.Apache.REEF.Wake.Remote;
-using Org.Apache.REEF.Wake.Remote.Impl;
-using Org.Apache.REEF.Wake.Util;
-
-namespace Org.Apache.REEF.Wake.Tests
-{
- [TestClass]
- [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)]
- public class WritableRemoteManagerTest
- {
- private const int Id = 5;
-
- private static IConfiguration _config = TangFactory.GetTang().NewConfigurationBuilder().BindNamedParameter<StringId, int>(
- GenericType<StringId>.Class, Id.ToString(CultureInfo.InvariantCulture)).Build();
-
- private readonly WritableRemoteManagerFactory _remoteManagerFactory1 =
- TangFactory.GetTang().NewInjector().GetInstance<WritableRemoteManagerFactory>();
-
- private readonly WritableRemoteManagerFactory _remoteManagerFactory2 =
- TangFactory.GetTang().NewInjector(_config).GetInstance<WritableRemoteManagerFactory>();
-
- /// <summary>
- /// Tests one way communication between Remote Managers
- /// Remote Manager listens on any available port
- /// </summary>
- [TestMethod]
- public void TestWritableOneWayCommunication()
- {
- IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
-
- BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>();
- List<string> events = new List<string>();
-
- using (var remoteManager1 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
- using (var remoteManager2 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
- {
- var observer = Observer.Create<WritableString>(queue.Add);
- IPEndPoint endpoint1 = new IPEndPoint(listeningAddress, 0);
- remoteManager2.RegisterObserver(endpoint1, observer);
-
- var remoteObserver = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
- remoteObserver.OnNext(new WritableString("abc"));
- remoteObserver.OnNext(new WritableString("def"));
- remoteObserver.OnNext(new WritableString("ghi"));
-
- events.Add(queue.Take().Data);
- events.Add(queue.Take().Data);
- events.Add(queue.Take().Data);
- }
-
- Assert.AreEqual(3, events.Count);
- }
-
- /// <summary>
- /// Tests one way communication between Remote Managers
- /// Remote manager listens on a particular port
- /// </summary>
- [TestMethod]
- public void TestWritableOneWayCommunicationClientOnly()
- {
- int listeningPort = NetworkUtils.GenerateRandomPort(8900, 8940);
- IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
-
- BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>();
- List<string> events = new List<string>();
-
- using (var remoteManager1 = _remoteManagerFactory1.GetInstance<WritableString>())
- using (var remoteManager2 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, listeningPort))
- {
- IPEndPoint remoteEndpoint = new IPEndPoint(listeningAddress, 0);
- var observer = Observer.Create<WritableString>(queue.Add);
- remoteManager2.RegisterObserver(remoteEndpoint, observer);
-
- var remoteObserver = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
- remoteObserver.OnNext(new WritableString("abc"));
- remoteObserver.OnNext(new WritableString("def"));
- remoteObserver.OnNext(new WritableString("ghi"));
-
- events.Add(queue.Take().Data);
- events.Add(queue.Take().Data);
- events.Add(queue.Take().Data);
- }
-
- Assert.AreEqual(3, events.Count);
- }
-
- /// <summary>
- /// Tests two way communications. Checks whether both sides are able to receive messages
- /// </summary>
- [TestMethod]
- public void TestWritableTwoWayCommunication()
- {
- IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
-
- BlockingCollection<WritableString> queue1 = new BlockingCollection<WritableString>();
- BlockingCollection<WritableString> queue2 = new BlockingCollection<WritableString>();
- List<string> events1 = new List<string>();
- List<string> events2 = new List<string>();
-
- using (var remoteManager1 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
- using (var remoteManager2 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
- {
- // Register observers for remote manager 1 and remote manager 2
- var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
- var observer1 = Observer.Create<WritableString>(queue1.Add);
- var observer2 = Observer.Create<WritableString>(queue2.Add);
- remoteManager1.RegisterObserver(remoteEndpoint, observer1);
- remoteManager2.RegisterObserver(remoteEndpoint, observer2);
-
- // Remote manager 1 sends 3 events to remote manager 2
- var remoteObserver1 = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
- remoteObserver1.OnNext(new WritableString("abc"));
- remoteObserver1.OnNext(new WritableString("def"));
- remoteObserver1.OnNext(new WritableString("ghi"));
-
- // Remote manager 2 sends 4 events to remote manager 1
- var remoteObserver2 = remoteManager2.GetRemoteObserver(remoteManager1.LocalEndpoint);
- remoteObserver2.OnNext(new WritableString("jkl"));
- remoteObserver2.OnNext(new WritableString("mno"));
- remoteObserver2.OnNext(new WritableString("pqr"));
- remoteObserver2.OnNext(new WritableString("stu"));
-
- events1.Add(queue1.Take().Data);
- events1.Add(queue1.Take().Data);
- events1.Add(queue1.Take().Data);
- events1.Add(queue1.Take().Data);
-
- events2.Add(queue2.Take().Data);
- events2.Add(queue2.Take().Data);
- events2.Add(queue2.Take().Data);
- }
-
- Assert.AreEqual(4, events1.Count);
- Assert.AreEqual(3, events2.Count);
- }
-
- /// <summary>
- /// Tests two way communications where message needs an injectable argument
- /// to be passed. Checks whether both sides are able to receive messages
- /// </summary>
- [TestMethod]
- public void TestNonEmptyArgumentInjectionWritableTwoWayCommunication()
- {
- IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
-
- BlockingCollection<PrefixedStringWritable> queue1 = new BlockingCollection<PrefixedStringWritable>();
- BlockingCollection<PrefixedStringWritable> queue2 = new BlockingCollection<PrefixedStringWritable>();
- List<string> events1 = new List<string>();
- List<string> events2 = new List<string>();
-
- using (var remoteManager1 = _remoteManagerFactory2.GetInstance<PrefixedStringWritable>(listeningAddress, 0))
- using (var remoteManager2 = _remoteManagerFactory2.GetInstance<PrefixedStringWritable>(listeningAddress, 0))
- {
- // Register observers for remote manager 1 and remote manager 2
- var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
- var observer1 = Observer.Create<PrefixedStringWritable>(queue1.Add);
- var observer2 = Observer.Create<PrefixedStringWritable>(queue2.Add);
- remoteManager1.RegisterObserver(remoteEndpoint, observer1);
- remoteManager2.RegisterObserver(remoteEndpoint, observer2);
-
- // Remote manager 1 sends 3 events to remote manager 2
- var remoteObserver1 = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
- remoteObserver1.OnNext(new PrefixedStringWritable("abc"));
- remoteObserver1.OnNext(new PrefixedStringWritable("def"));
- remoteObserver1.OnNext(new PrefixedStringWritable("ghi"));
-
- // Remote manager 2 sends 4 events to remote manager 1
- var remoteObserver2 = remoteManager2.GetRemoteObserver(remoteManager1.LocalEndpoint);
- remoteObserver2.OnNext(new PrefixedStringWritable("jkl"));
- remoteObserver2.OnNext(new PrefixedStringWritable("mno"));
- remoteObserver2.OnNext(new PrefixedStringWritable("pqr"));
- remoteObserver2.OnNext(new PrefixedStringWritable("stu"));
-
- events1.Add(queue1.Take().Data);
- events1.Add(queue1.Take().Data);
- events1.Add(queue1.Take().Data);
- events1.Add(queue1.Take().Data);
-
- events2.Add(queue2.Take().Data);
- events2.Add(queue2.Take().Data);
- events2.Add(queue2.Take().Data);
- }
-
- Assert.AreEqual(4, events1.Count);
- Assert.AreEqual(3, events2.Count);
- }
-
- /// <summary>
- /// Tests one way communication between 3 nodes.
- /// nodes 1 and 2 send messages to node 3
- /// </summary>
- [TestMethod]
- public void TestWritableCommunicationThreeNodesOneWay()
- {
- IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
-
- BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>();
- List<string> events = new List<string>();
-
- using (var remoteManager1 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
- using (var remoteManager2 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
- using (var remoteManager3 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
- {
- var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
- var observer = Observer.Create<WritableString>(queue.Add);
- remoteManager3.RegisterObserver(remoteEndpoint, observer);
-
- var remoteObserver1 = remoteManager1.GetRemoteObserver(remoteManager3.LocalEndpoint);
- var remoteObserver2 = remoteManager2.GetRemoteObserver(remoteManager3.LocalEndpoint);
-
- remoteObserver2.OnNext(new WritableString("abc"));
- remoteObserver1.OnNext(new WritableString("def"));
- remoteObserver2.OnNext(new WritableString("ghi"));
- remoteObserver1.OnNext(new WritableString("jkl"));
- remoteObserver2.OnNext(new WritableString("mno"));
-
- for (int i = 0; i < 5; i++)
- {
- events.Add(queue.Take().Data);
- }
- }
-
- Assert.AreEqual(5, events.Count);
- }
-
- /// <summary>
- /// Tests one way communication between 3 nodes.
- /// nodes 1 and 2 send messages to node 3 and node 3 sends message back
- /// </summary>
- [TestMethod]
- public void TestWritableCommunicationThreeNodesBothWays()
- {
- IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
-
- BlockingCollection<WritableString> queue1 = new BlockingCollection<WritableString>();
- BlockingCollection<WritableString> queue2 = new BlockingCollection<WritableString>();
- BlockingCollection<WritableString> queue3 = new BlockingCollection<WritableString>();
- List<string> events1 = new List<string>();
- List<string> events2 = new List<string>();
- List<string> events3 = new List<string>();
-
- using (var remoteManager1 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
- using (var remoteManager2 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
- using (var remoteManager3 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
- {
- var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
-
- var observer = Observer.Create<WritableString>(queue1.Add);
- remoteManager1.RegisterObserver(remoteEndpoint, observer);
- var observer2 = Observer.Create<WritableString>(queue2.Add);
- remoteManager2.RegisterObserver(remoteEndpoint, observer2);
- var observer3 = Observer.Create<WritableString>(queue3.Add);
- remoteManager3.RegisterObserver(remoteEndpoint, observer3);
-
- var remoteObserver1 = remoteManager1.GetRemoteObserver(remoteManager3.LocalEndpoint);
- var remoteObserver2 = remoteManager2.GetRemoteObserver(remoteManager3.LocalEndpoint);
-
- // Observer 1 and 2 send messages to observer 3
- remoteObserver1.OnNext(new WritableString("abc"));
- remoteObserver1.OnNext(new WritableString("abc"));
- remoteObserver1.OnNext(new WritableString("abc"));
- remoteObserver2.OnNext(new WritableString("def"));
- remoteObserver2.OnNext(new WritableString("def"));
-
- // Observer 3 sends messages back to observers 1 and 2
- var remoteObserver3A = remoteManager3.GetRemoteObserver(remoteManager1.LocalEndpoint);
- var remoteObserver3B = remoteManager3.GetRemoteObserver(remoteManager2.LocalEndpoint);
-
- remoteObserver3A.OnNext(new WritableString("ghi"));
- remoteObserver3A.OnNext(new WritableString("ghi"));
- remoteObserver3B.OnNext(new WritableString("jkl"));
- remoteObserver3B.OnNext(new WritableString("jkl"));
- remoteObserver3B.OnNext(new WritableString("jkl"));
-
- events1.Add(queue1.Take().Data);
- events1.Add(queue1.Take().Data);
-
- events2.Add(queue2.Take().Data);
- events2.Add(queue2.Take().Data);
- events2.Add(queue2.Take().Data);
-
- events3.Add(queue3.Take().Data);
- events3.Add(queue3.Take().Data);
- events3.Add(queue3.Take().Data);
- events3.Add(queue3.Take().Data);
- events3.Add(queue3.Take().Data);
- }
-
- Assert.AreEqual(2, events1.Count);
- Assert.AreEqual(3, events2.Count);
- Assert.AreEqual(5, events3.Count);
- }
-
- /// <summary>
- /// Tests whether remote manager is able to send acknowledgement back
- /// </summary>
- [TestMethod]
- public void TestWritableRemoteSenderCallback()
- {
- IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
-
- BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>();
- List<string> events = new List<string>();
-
- using (var remoteManager1 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
- using (var remoteManager2 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
- {
- // Register handler for when remote manager 2 receives events; respond
- // with an ack
- var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
- var remoteObserver2 = remoteManager2.GetRemoteObserver(remoteManager1.LocalEndpoint);
-
- var receiverObserver = Observer.Create<WritableString>(
- message => remoteObserver2.OnNext(new WritableString("received message: " + message.Data)));
- remoteManager2.RegisterObserver(remoteEndpoint, receiverObserver);
-
- // Register handler for remote manager 1 to record the ack
- var senderObserver = Observer.Create<WritableString>(queue.Add);
- remoteManager1.RegisterObserver(remoteEndpoint, senderObserver);
-
- // Begin to send messages
- var remoteObserver1 = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
- remoteObserver1.OnNext(new WritableString("hello"));
- remoteObserver1.OnNext(new WritableString("there"));
- remoteObserver1.OnNext(new WritableString("buddy"));
-
- events.Add(queue.Take().Data);
- events.Add(queue.Take().Data);
- events.Add(queue.Take().Data);
- }
-
- Assert.AreEqual(3, events.Count);
- Assert.AreEqual("received message: hello", events[0]);
- Assert.AreEqual("received message: there", events[1]);
- Assert.AreEqual("received message: buddy", events[2]);
- }
-
- /// <summary>
- /// Test whether observer can be created with IRemoteMessage interface
- /// </summary>
- [TestMethod]
- public void TestWritableRegisterObserverByType()
- {
- IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
-
- BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>();
- List<string> events = new List<string>();
-
- using (var remoteManager1 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
- using (var remoteManager2 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
- {
- // RemoteManager2 listens and records events of type IRemoteEvent<WritableString>
- var observer = Observer.Create<IRemoteMessage<WritableString>>(message => queue.Add(message.Message));
- remoteManager2.RegisterObserver(observer);
-
- // Remote manager 1 sends 3 events to remote manager 2
- var remoteObserver = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
- remoteObserver.OnNext(new WritableString("abc"));
- remoteObserver.OnNext(new WritableString("def"));
- remoteObserver.OnNext(new WritableString("ghi"));
-
- events.Add(queue.Take().Data);
- events.Add(queue.Take().Data);
- events.Add(queue.Take().Data);
- }
-
- Assert.AreEqual(3, events.Count);
- }
-
- /// <summary>
- /// Tests whether we get the cached observer back for sending message without reinstantiating it
- /// </summary>
- [TestMethod]
- public void TestWritableCachedConnection()
- {
- IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
-
- BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>();
- List<string> events = new List<string>();
-
- using (var remoteManager1 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
- using (var remoteManager2 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
- {
- var observer = Observer.Create<WritableString>(queue.Add);
- IPEndPoint endpoint1 = new IPEndPoint(listeningAddress, 0);
- remoteManager2.RegisterObserver(endpoint1, observer);
-
- var remoteObserver = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
- remoteObserver.OnNext(new WritableString("abc"));
- remoteObserver.OnNext(new WritableString("def"));
-
- var cachedObserver = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
- cachedObserver.OnNext(new WritableString("ghi"));
- cachedObserver.OnNext(new WritableString("jkl"));
-
- events.Add(queue.Take().Data);
- events.Add(queue.Take().Data);
- events.Add(queue.Take().Data);
- events.Add(queue.Take().Data);
- }
-
- Assert.AreEqual(4, events.Count);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake.Tests/WritableTransportTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/WritableTransportTest.cs b/lang/cs/Org.Apache.REEF.Wake.Tests/WritableTransportTest.cs
deleted file mode 100644
index 2255cfa..0000000
--- a/lang/cs/Org.Apache.REEF.Wake.Tests/WritableTransportTest.cs
+++ /dev/null
@@ -1,223 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Collections.Concurrent;
-using System.Collections.Generic;
-using System.Globalization;
-using System.IO;
-using System.Net;
-using System.Reactive;
-using System.Threading;
-using System.Threading.Tasks;
-using Microsoft.VisualStudio.TestTools.UnitTesting;
-using Org.Apache.REEF.Tang.Implementations.Tang;
-using Org.Apache.REEF.Tang.Interface;
-using Org.Apache.REEF.Tang.Util;
-using Org.Apache.REEF.Wake.Remote;
-using Org.Apache.REEF.Wake.Remote.Impl;
-using Org.Apache.REEF.Wake.Remote.Parameters;
-using Org.Apache.REEF.Wake.Util;
-
-namespace Org.Apache.REEF.Wake.Tests
-{
- /// <summary>
- /// Tests the WritableTransportServer, WritableTransportClient and WritableLink.
- /// Basically the Wake transport layer.
- /// </summary>
- [TestClass]
- [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)]
- public class WritableTransportTest
- {
- private readonly ITcpPortProvider _tcpPortProvider = GetTcpProvider(8900, 8940);
- private readonly IInjector _injector = TangFactory.GetTang().NewInjector();
-
- /// <summary>
- /// Tests whether WritableTransportServer receives
- /// string messages from WritableTransportClient
- /// </summary>
- [TestMethod]
- public void TestWritableTransportServer()
- {
- BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>();
- List<string> events = new List<string>();
-
- IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, 0);
- var remoteHandler = Observer.Create<TransportEvent<WritableString>>(tEvent => queue.Add(tEvent.Data));
-
- using (var server = new WritableTransportServer<WritableString>(endpoint, remoteHandler, _tcpPortProvider, _injector))
- {
- server.Run();
-
- IPEndPoint remoteEndpoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), server.LocalEndpoint.Port);
- using (var client = new WritableTransportClient<WritableString>(remoteEndpoint, _injector))
- {
- client.Send(new WritableString("Hello"));
- client.Send(new WritableString(", "));
- client.Send(new WritableString("World!"));
-
- events.Add(queue.Take().Data);
- events.Add(queue.Take().Data);
- events.Add(queue.Take().Data);
- }
- }
-
- Assert.AreEqual(3, events.Count);
- Assert.AreEqual(events[0], "Hello");
- Assert.AreEqual(events[1], ", ");
- Assert.AreEqual(events[2], "World!");
- }
-
-
- /// <summary>
- /// Tests whether WritableTransportServer receives
- /// string messages from WritableTransportClient with non empty injector
- /// </summary>
- [TestMethod]
- public void TestNonEmptyInjectionTransportServer()
- {
- int id = 5;
- IConfiguration config = TangFactory.GetTang().NewConfigurationBuilder().BindNamedParameter<StringId, int>(
- GenericType<StringId>.Class, id.ToString(CultureInfo.InvariantCulture)).Build();
-
- IInjector injector = TangFactory.GetTang().NewInjector(config);
-
- BlockingCollection<PrefixedStringWritable> queue = new BlockingCollection<PrefixedStringWritable>();
- List<string> events = new List<string>();
-
- IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, 0);
- var remoteHandler = Observer.Create<TransportEvent<PrefixedStringWritable>>(tEvent => queue.Add(tEvent.Data));
-
- using (var server = new WritableTransportServer<PrefixedStringWritable>(endpoint, remoteHandler, _tcpPortProvider, injector.ForkInjector()))
- {
- server.Run();
-
- IPEndPoint remoteEndpoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), server.LocalEndpoint.Port);
- using (var client = new WritableTransportClient<PrefixedStringWritable>(remoteEndpoint, injector.ForkInjector()))
- {
- client.Send(new PrefixedStringWritable("Hello"));
- client.Send(new PrefixedStringWritable(", "));
- client.Send(new PrefixedStringWritable("World!"));
-
- events.Add(queue.Take().Data);
- events.Add(queue.Take().Data);
- events.Add(queue.Take().Data);
- }
- }
-
- Assert.AreEqual(3, events.Count);
- Assert.AreEqual(events[0], "Hello_" + id);
- Assert.AreEqual(events[1], ", _" + id);
- Assert.AreEqual(events[2], "World!_" + id);
- }
-
-
- /// <summary>
- /// Checks whether WritableTransportClient is able to receive messages from remote host
- /// </summary>
- [TestMethod]
- public void TestWritableTransportSenderStage()
- {
-
- IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, 0);
-
- List<string> events = new List<string>();
- BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>();
-
- // Server echoes the message back to the client
- var remoteHandler = Observer.Create<TransportEvent<WritableString>>(tEvent => tEvent.Link.Write(tEvent.Data));
-
- using (var server = new WritableTransportServer<WritableString>(endpoint, remoteHandler, _tcpPortProvider, _injector))
- {
- server.Run();
-
- var clientHandler = Observer.Create<TransportEvent<WritableString>>(tEvent => queue.Add(tEvent.Data));
- IPEndPoint remoteEndpoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), server.LocalEndpoint.Port);
- using (var client = new WritableTransportClient<WritableString>(remoteEndpoint, clientHandler, _injector))
- {
- client.Send(new WritableString("Hello"));
- client.Send(new WritableString(", "));
- client.Send(new WritableString(" World"));
-
- events.Add(queue.Take().Data);
- events.Add(queue.Take().Data);
- events.Add(queue.Take().Data);
- }
- }
-
- Assert.AreEqual(3, events.Count);
- Assert.AreEqual(events[0], "Hello");
- Assert.AreEqual(events[1], ", ");
- Assert.AreEqual(events[2], " World");
- }
-
- /// <summary>
- /// Checks whether WritableTransportClient and WritableTransportServer works
- /// in asynchronous condition while sending messages asynchronously from different
- /// threads
- /// </summary>
- [TestMethod]
- public void TestWritableRaceCondition()
- {
- BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>();
- List<string> events = new List<string>();
- int numEventsExpected = 150;
-
- IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, 0);
- var remoteHandler = Observer.Create<TransportEvent<WritableString>>(tEvent => queue.Add(tEvent.Data));
-
- using (var server = new WritableTransportServer<WritableString>(endpoint, remoteHandler, _tcpPortProvider, _injector))
- {
- server.Run();
-
- for (int i = 0; i < numEventsExpected / 3; i++)
- {
- Task.Run(() =>
- {
- IPEndPoint remoteEndpoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), server.LocalEndpoint.Port);
- using (var client = new WritableTransportClient<WritableString>(remoteEndpoint, _injector))
- {
- client.Send(new WritableString("Hello"));
- client.Send(new WritableString(", "));
- client.Send(new WritableString("World!"));
- }
- });
- }
-
- for (int i = 0; i < numEventsExpected; i++)
- {
- events.Add(queue.Take().Data);
- }
- }
-
- Assert.AreEqual(numEventsExpected, events.Count);
-
- }
-
- private static ITcpPortProvider GetTcpProvider(int portRangeStart, int portRangeEnd)
- {
- var configuration = TangFactory.GetTang().NewConfigurationBuilder()
- .BindImplementation<ITcpPortProvider, TcpPortProvider>()
- .BindIntNamedParam<TcpPortRangeStart>(portRangeStart.ToString())
- .BindIntNamedParam<TcpPortRangeCount>((portRangeEnd - portRangeStart + 1).ToString())
- .Build();
- return TangFactory.GetTang().NewInjector(configuration).GetInstance<ITcpPortProvider>();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj b/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
index d2b2970..4069d15 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
+++ b/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
@@ -48,7 +48,9 @@ under the License.
<Compile Include="IEventHandler.cs" />
<Compile Include="IIdentifier.cs" />
<Compile Include="IIdentifierFactory.cs" />
- <Compile Include="Remote\Impl\WritableRemoteManagerFactory.cs" />
+ <Compile Include="Remote\Impl\RemoteEventStreamingCodec.cs" />
+ <Compile Include="Remote\Impl\TemporaryWritableToStreamingCodec.cs" />
+ <Compile Include="Remote\Impl\StreamingRemoteManagerFactory.cs" />
<Compile Include="Remote\Impl\DefaultRemoteManagerFactory.cs" />
<Compile Include="Impl\LoggingEventHandler.cs" />
<Compile Include="Impl\MissingStartHandlerHandler.cs" />
@@ -71,15 +73,12 @@ under the License.
<Compile Include="Remote\IDataWriter.cs" />
<Compile Include="Remote\Impl\StreamDataReader.cs" />
<Compile Include="Remote\Impl\StreamDataWriter.cs" />
- <Compile Include="Remote\Impl\WritableRemoteManager.cs" />
- <Compile Include="Remote\Impl\WritableLink.cs" />
- <Compile Include="Remote\Impl\WritableObserverContainer.cs" />
- <Compile Include="Remote\Impl\WritableRemoteEvent.cs" />
- <Compile Include="Remote\Impl\WritableTransportClient.cs" />
- <Compile Include="Remote\Impl\WritableTransportServer.cs" />
+ <Compile Include="Remote\Impl\StreamingRemoteManager.cs" />
+ <Compile Include="Remote\Impl\StreamingLink.cs" />
+ <Compile Include="Remote\Impl\StreamingTransportClient.cs" />
+ <Compile Include="Remote\Impl\StreamingTransportServer.cs" />
<Compile Include="Remote\IRemoteManagerFactory.cs" />
<Compile Include="Remote\IWritable.cs" />
- <Compile Include="Remote\IWritableRemoteEvent.cs" />
<Compile Include="Remote\Proto\WakeRemoteProtosGen.cs" />
<Compile Include="Remote\ICodec.cs" />
<Compile Include="Remote\ICodecFactory.cs" />
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake/Remote/IWritableRemoteEvent.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/IWritableRemoteEvent.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/IWritableRemoteEvent.cs
deleted file mode 100644
index 40222aa..0000000
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/IWritableRemoteEvent.cs
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Linq.Expressions;
-using System.Net;
-
-namespace Org.Apache.REEF.Wake.Remote
-{
- /// <summary>
- /// Interface for remote event
- /// </summary>
- /// <typeparam name="T">Type of remote event message. It is assumed that T implements IWritable</typeparam>
- [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)]
- internal interface IWritableRemoteEvent<T> : IWritable where T : IWritable
- {
- /// <summary>
- /// Local Endpoint
- /// </summary>
- IPEndPoint LocalEndPoint { get; set; }
-
- /// <summary>
- /// Remote Endpoint
- /// </summary>
- IPEndPoint RemoteEndPoint { get; set; }
-
- T Value { get; }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEvent.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEvent.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEvent.cs
index b39b20f..bfed7f9 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEvent.cs
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEvent.cs
@@ -17,6 +17,7 @@
* under the License.
*/
+using System;
using System.Net;
using Org.Apache.REEF.Tang.Annotations;
@@ -50,12 +51,15 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
public IPEndPoint RemoteEndPoint { get; set; }
+ [Obsolete("This field is never used and will be removed as part of 0.13. See [REEF-445]", false)]
public string Source { get; set; }
+ [Obsolete("This field is never used and will be removed as part of 0.13. See [REEF-445]", false)]
public string Sink { get; set; }
public T Value { get; set; }
+ [Obsolete("This field is never used and will be removed as part of 0.13. See [REEF-445]", false)]
public long Sequence { get; set; }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEventStreamingCodec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEventStreamingCodec.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEventStreamingCodec.cs
new file mode 100644
index 0000000..01acd73
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEventStreamingCodec.cs
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Threading;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Wake.StreamingCodec;
+
+namespace Org.Apache.REEF.Wake.Remote.Impl
+{
+ /// <summary>
+ /// Writable remote event class
+ /// </summary>
+ /// <typeparam name="T">Type of remote event message. It is assumed that T implements IWritable</typeparam>
+ internal sealed class RemoteEventStreamingCodec<T> : IStreamingCodec<IRemoteEvent<T>>
+ {
+ private readonly IStreamingCodec<T> _codec;
+
+ internal RemoteEventStreamingCodec(IStreamingCodec<T> codec)
+ {
+ _codec = codec;
+ }
+
+ /// <summary>
+ /// Read the class fields.
+ /// </summary>
+ /// <param name="reader">The reader from which to read </param>
+ /// <returns>The remote event</returns>
+ public IRemoteEvent<T> Read(IDataReader reader)
+ {
+ return new RemoteEvent<T>(null, null, _codec.Read(reader));
+ }
+
+ /// <summary>
+ /// Writes the class fields.
+ /// </summary>
+ /// <param name="value">The remote event</param>
+ /// <param name="writer">The writer to which to write</param>
+ public void Write(IRemoteEvent<T> value, IDataWriter writer)
+ {
+ _codec.Write(value.Value, writer);
+ }
+
+ /// <summary>
+ /// Read the class fields.
+ /// </summary>
+ /// <param name="reader">The reader from which to read </param>
+ /// <param name="token">The cancellation token</param>
+ /// <returns>The remote event</returns>
+ public async Task<IRemoteEvent<T>> ReadAsync(IDataReader reader, CancellationToken token)
+ {
+ T message = await _codec.ReadAsync(reader, token);
+ return new RemoteEvent<T>(null, null, message);
+ }
+
+ /// <summary>
+ /// Writes the class fields.
+ /// </summary>
+ /// <param name="value">The remote event</param>
+ /// <param name="writer">The writer to which to write</param>
+ /// <param name="token">The cancellation token</param>
+ public async Task WriteAsync(IRemoteEvent<T> value, IDataWriter writer, CancellationToken token)
+ {
+ await _codec.WriteAsync(value.Value, writer, token);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingLink.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingLink.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingLink.cs
new file mode 100644
index 0000000..4396b56
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingLink.cs
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Net;
+using System.Net.Sockets;
+using System.Threading;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Tang.Exceptions;
+using Org.Apache.REEF.Utilities.Diagnostics;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Wake.StreamingCodec;
+using Org.Apache.REEF.Wake.Util;
+
+namespace Org.Apache.REEF.Wake.Remote.Impl
+{
+ /// <summary>
+ /// Represents an open connection between remote hosts. This class is not thread safe
+ /// </summary>
+ /// <typeparam name="T">Generic Type of message.</typeparam>
+ internal sealed class StreamingLink<T> : ILink<T>
+ {
+ private static readonly Logger Logger = Logger.GetLogger(typeof (StreamingLink<T>));
+
+ private readonly IPEndPoint _localEndpoint;
+ private bool _disposed;
+ private readonly IStreamingCodec<T> _streamingCodec;
+ private readonly TcpClient _client;
+
+ /// <summary>
+ /// Stream reader to be passed to codec
+ /// </summary>
+ private readonly StreamDataReader _reader;
+
+ /// <summary>
+ /// Stream writer from which to read from in codec
+ /// </summary>
+ private readonly StreamDataWriter _writer;
+
+ /// <summary>
+ /// Constructs a Link object.
+ /// Connects to the specified remote endpoint.
+ /// </summary>
+ /// <param name="remoteEndpoint">The remote endpoint to connect to</param>
+ /// <param name="streamingCodec">Streaming codec</param>
+ internal StreamingLink(IPEndPoint remoteEndpoint, IStreamingCodec<T> streamingCodec)
+ {
+ if (remoteEndpoint == null)
+ {
+ throw new ArgumentNullException("remoteEndpoint");
+ }
+
+ _client = new TcpClient();
+ _client.Connect(remoteEndpoint);
+
+ var stream = _client.GetStream();
+ _localEndpoint = GetLocalEndpoint();
+ _disposed = false;
+ _reader = new StreamDataReader(stream);
+ _writer = new StreamDataWriter(stream);
+ _streamingCodec = streamingCodec;
+ }
+
+ /// <summary>
+ /// Constructs a Link object.
+ /// Uses the already connected TcpClient.
+ /// </summary>
+ /// <param name="client">The already connected client</param>
+ /// <param name="streamingCodec">Streaming codec</param>
+ internal StreamingLink(TcpClient client, IStreamingCodec<T> streamingCodec)
+ {
+ if (client == null)
+ {
+ throw new ArgumentNullException("client");
+ }
+
+ _client = client;
+ var stream = _client.GetStream();
+ _localEndpoint = GetLocalEndpoint();
+ _disposed = false;
+ _reader = new StreamDataReader(stream);
+ _writer = new StreamDataWriter(stream);
+ _streamingCodec = streamingCodec;
+ }
+
+ /// <summary>
+ /// Returns the local socket address
+ /// </summary>
+ public IPEndPoint LocalEndpoint
+ {
+ get { return _localEndpoint; }
+ }
+
+ /// <summary>
+ /// Returns the remote socket address
+ /// </summary>
+ public IPEndPoint RemoteEndpoint
+ {
+ get { return (IPEndPoint) _client.Client.RemoteEndPoint; }
+ }
+
+ /// <summary>
+ /// Writes the message to the remote host
+ /// </summary>
+ /// <param name="value">The data to write</param>
+ public void Write(T value)
+ {
+ if (value == null)
+ {
+ throw new ArgumentNullException("value");
+ }
+ if (_disposed)
+ {
+ Exceptions.Throw(new IllegalStateException("Link has been closed."), Logger);
+ }
+
+ _streamingCodec.Write(value, _writer);
+ }
+
+ /// <summary>
+ /// Writes the value to this link asynchronously
+ /// </summary>
+ /// <param name="value">The data to write</param>
+ /// <param name="token">The cancellation token</param>
+ public async Task WriteAsync(T value, CancellationToken token)
+ {
+ if (_disposed)
+ {
+ Exceptions.Throw(new IllegalStateException("Link has been closed."), Logger);
+ }
+
+ await _streamingCodec.WriteAsync(value, _writer, token);
+ }
+
+ /// <summary>
+ /// Reads the value from the link synchronously
+ /// </summary>
+ public T Read()
+ {
+ if (_disposed)
+ {
+ Exceptions.Throw(new IllegalStateException("Link has been disposed."), Logger);
+ }
+
+ try
+ {
+ T value = _streamingCodec.Read(_reader);
+ return value;
+ }
+ catch (Exception e)
+ {
+ Logger.Log(Level.Warning, "In Read function unable to read the message.");
+ Exceptions.CaughtAndThrow(e, Level.Error, Logger);
+ throw;
+ }
+ }
+
+ /// <summary>
+ /// Reads the value from the link asynchronously
+ /// </summary>
+ /// <param name="token">The cancellation token</param>
+ public async Task<T> ReadAsync(CancellationToken token)
+ {
+ if (_disposed)
+ {
+ Exceptions.Throw(new IllegalStateException("Link has been disposed."), Logger);
+ }
+
+ try
+ {
+ T value = await _streamingCodec.ReadAsync(_reader, token);
+ return value;
+ }
+ catch (Exception e)
+ {
+ Logger.Log(Level.Warning, "In ReadAsync function unable to read the message.");
+ Exceptions.CaughtAndThrow(e, Level.Error, Logger);
+ throw;
+ }
+ }
+
+ /// <summary>
+ /// Close the client connection
+ /// </summary>
+ public void Dispose()
+ {
+ if (_disposed)
+ {
+ return;
+ }
+
+ try
+ {
+ _client.GetStream().Close();
+ }
+ catch (InvalidOperationException)
+ {
+ Logger.Log(Level.Warning, "failed to close stream on a non-connected socket.");
+ }
+
+ _client.Close();
+ _disposed = true;
+ }
+
+ /// <summary>
+ /// Overrides Equals. Two Link objects are equal if they are connected
+ /// to the same remote endpoint.
+ /// </summary>
+ /// <param name="obj">The object to compare</param>
+ /// <returns>True if the object is equal to this Link, otherwise false</returns>
+ public override bool Equals(object obj)
+ {
+ Link<T> other = obj as Link<T>;
+ if (other == null)
+ {
+ return false;
+ }
+
+ return other.RemoteEndpoint.Equals(RemoteEndpoint);
+ }
+
+ /// <summary>
+ /// Gets the hash code for the Link object.
+ /// </summary>
+ /// <returns>The object's hash code</returns>
+ public override int GetHashCode()
+ {
+ return RemoteEndpoint.GetHashCode();
+ }
+
+ /// <summary>
+ /// Discovers the IPEndpoint for the current machine.
+ /// </summary>
+ /// <returns>The local IPEndpoint</returns>
+ private IPEndPoint GetLocalEndpoint()
+ {
+ IPAddress address = NetworkUtils.LocalIPAddress;
+ int port = ((IPEndPoint) _client.Client.LocalEndPoint).Port;
+ return new IPEndPoint(address, port);
+ }
+ }
+}
\ No newline at end of file