You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/01/29 21:42:53 UTC
[09/31] incubator-reef git commit: [REEF-97] Add the REEF.NET code
base
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/Impl/Link.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/Link.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/Link.cs
new file mode 100644
index 0000000..d5b987a
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/Impl/Link.cs
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Net;
+using System.Net.Sockets;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using Org.Apache.Reef.Utilities.Diagnostics;
+using Org.Apache.Reef.Utilities.Logging;
+using Org.Apache.Reef.Tang.Exceptions;
+using Org.Apache.Reef.Wake.Util;
+
+namespace Org.Apache.Reef.Wake.Remote.Impl
+{
+ /// <summary>
+ /// Represents an open connection between remote hosts
+ /// </summary>
+ public class Link<T> : ILink<T>
+ {
+ private static readonly Logger LOGGER = Logger.GetLogger(typeof(Link<T>));
+
+ private IPEndPoint _localEndpoint;
+ private ICodec<T> _codec;
+ private Channel _channel;
+ private bool _disposed;
+
+ /// <summary>
+ /// Constructs a Link object.
+ /// Connects to the specified remote endpoint.
+ /// </summary>
+ /// <param name="remoteEndpoint">The remote endpoint to connect to</param>
+ /// <param name="codec">The codec for serializing messages</param>
+ public Link(IPEndPoint remoteEndpoint, ICodec<T> codec)
+ {
+ if (remoteEndpoint == null)
+ {
+ throw new ArgumentNullException("remoteEndpoint");
+ }
+ if (codec == null)
+ {
+ throw new ArgumentNullException("codec");
+ }
+
+ Client = new TcpClient();
+ Client.Connect(remoteEndpoint);
+
+ _codec = codec;
+ _channel = new Channel(Client.GetStream());
+ _localEndpoint = GetLocalEndpoint();
+ _disposed = false;
+ }
+
+ /// <summary>
+ /// Constructs a Link object.
+ /// Uses the already connected TcpClient.
+ /// </summary>
+ /// <param name="client">The already connected client</param>
+ /// <param name="codec">The encoder and decoder</param>
+ public Link(TcpClient client, ICodec<T> codec)
+ {
+ if (client == null)
+ {
+ throw new ArgumentNullException("client");
+ }
+ if (codec == null)
+ {
+ throw new ArgumentNullException("codec");
+ }
+
+ Client = client;
+ _codec = codec;
+ _channel = new Channel(Client.GetStream());
+ _localEndpoint = GetLocalEndpoint();
+ _disposed = false;
+ }
+
+ /// <summary>
+ /// Returns the local socket address
+ /// </summary>
+ public IPEndPoint LocalEndpoint
+ {
+ get { return _localEndpoint; }
+ }
+
+ /// <summary>
+ /// Returns the remote socket address
+ /// </summary>
+ public IPEndPoint RemoteEndpoint
+ {
+ get { return (IPEndPoint) Client.Client.RemoteEndPoint; }
+ }
+
+ /// <summary>
+ /// Gets the underlying TcpClient
+ /// </summary>
+ public TcpClient Client { get; private set; }
+
+ /// <summary>
+ /// Writes the message to the remote host
+ /// </summary>
+ /// <param name="value">The data to write</param>
+ public void Write(T value)
+ {
+ if (value == null)
+ {
+ throw new ArgumentNullException("value");
+ }
+ if (_disposed)
+ {
+ Exceptions.Throw(new IllegalStateException("Link has been closed."), LOGGER);
+ }
+
+ byte[] message = _codec.Encode(value);
+ _channel.Write(message);
+ }
+
+ /// <summary>
+ /// Writes the value to this link asynchronously
+ /// </summary>
+ /// <param name="value">The data to write</param>
+ /// <param name="token">The cancellation token</param>
+ public async Task WriteAsync(T value, CancellationToken token)
+ {
+ if (_disposed)
+ {
+ Exceptions.Throw(new IllegalStateException("Link has been closed."), LOGGER);
+ }
+
+ byte[] message = _codec.Encode(value);
+ await _channel.WriteAsync(message, token);
+ }
+
+ /// <summary>
+ /// Reads the value from the link synchronously
+ /// </summary>
+ public T Read()
+ {
+ if (_disposed)
+ {
+ Exceptions.Throw(new IllegalStateException("Link has been disposed."), LOGGER);
+ }
+
+ byte[] message = _channel.Read();
+ return (message == null) ? default(T) : _codec.Decode(message);
+ }
+
+ /// <summary>
+ /// Reads the value from the link asynchronously
+ /// </summary>
+ /// <param name="token">The cancellation token</param>
+ public async Task<T> ReadAsync(CancellationToken token)
+ {
+ if (_disposed)
+ {
+ Exceptions.Throw(new IllegalStateException("Link has been disposed."), LOGGER);
+ }
+
+ byte[] message = await _channel.ReadAsync(token);
+ return (message == null) ? default(T) : _codec.Decode(message);
+ }
+
+ /// <summary>
+ /// Close the client connection
+ /// </summary>
+ public void Dispose()
+ {
+ Dispose(true);
+ GC.SuppressFinalize(this);
+ }
+
+ /// <summary>
+ /// Subclasses of Links should overwrite this to handle disposing
+ /// of the link
+ /// </summary>
+ /// <param name="disposing">To dispose or not</param>
+ public virtual void Dispose(bool disposing)
+ {
+ if (_disposed)
+ {
+ return;
+ }
+
+ if (disposing)
+ {
+ try
+ {
+ Client.GetStream().Close();
+ }
+ catch (InvalidOperationException)
+ {
+ LOGGER.Log(Level.Warning, "failed to close stream on a non-connected socket.");
+ }
+
+ Client.Close();
+ }
+ _disposed = true;
+ }
+
+ /// <summary>
+ /// Overrides Equals. Two Link objects are equal if they are connected
+ /// to the same remote endpoint.
+ /// </summary>
+ /// <param name="obj">The object to compare</param>
+ /// <returns>True if the object is equal to this Link, otherwise false</returns>
+ public override bool Equals(object obj)
+ {
+ Link<T> other = obj as Link<T>;
+ if (other == null)
+ {
+ return false;
+ }
+
+ return other.RemoteEndpoint.Equals(RemoteEndpoint);
+ }
+
+ /// <summary>
+ /// Gets the hash code for the Link object.
+ /// </summary>
+ /// <returns>The object's hash code</returns>
+ public override int GetHashCode()
+ {
+ return RemoteEndpoint.GetHashCode();
+ }
+
+ /// <summary>
+ /// Discovers the IPEndpoint for the current machine.
+ /// </summary>
+ /// <returns>The local IPEndpoint</returns>
+ private IPEndPoint GetLocalEndpoint()
+ {
+ IPAddress address = NetworkUtils.LocalIPAddress;
+ int port = ((IPEndPoint) Client.Client.LocalEndPoint).Port;
+ return new IPEndPoint(address, port);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/Impl/MultiCodec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/MultiCodec.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/MultiCodec.cs
new file mode 100644
index 0000000..2791eb3
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/Impl/MultiCodec.cs
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using Org.Apache.Reef.Wake.Remote;
+using Org.Apache.Reef.Wake.Remote.Impl;
+
+namespace Org.Apache.Reef.Wake.Remote.Impl
+{
+ /// <summary>
+ /// Codec that can encode and decode a class depending on the class type.
+ /// </summary>
+ public class MultiCodec<T> : ICodec<T>
+ {
+ private readonly MultiEncoder<T> _encoder;
+
+ private readonly MultiDecoder<T> _decoder;
+
+ /// <summary>
+ /// Constructs a new MultiCodec object.
+ /// </summary>
+ public MultiCodec()
+ {
+ _encoder = new MultiEncoder<T>();
+ _decoder = new MultiDecoder<T>();
+ }
+
+ /// <summary>
+ /// Register a codec to be used when encoding/decoding objects of this type.
+ /// </summary>
+ /// <typeparam name="U">The type of codec</typeparam>
+ /// <param name="codec">The codec to use when encoding/decoding
+ /// objects of this type</param>
+ public void Register<U>(ICodec<U> codec) where U : T
+ {
+ _encoder.Register(codec);
+ _decoder.Register(codec);
+ }
+
+ /// <summary>
+ /// Register a codec to be used when encoding/decoding objects of this type.
+ /// </summary>
+ /// <typeparam name="U">The type of codec</typeparam>
+ /// <param name="codec">The codec to use when encoding/decoding
+ /// objects of this type</param>
+ /// <param name="name">The name of the class to encode/decode</param>
+ public void Register<U>(ICodec<U> codec, string name) where U : T
+ {
+ _encoder.Register(codec, name);
+ _decoder.Register(codec, name);
+ }
+
+ /// <summary>
+ /// Encodes an object with the appropriate encoding or null if it cannot
+ /// be encoded.
+ /// </summary>
+ /// <param name="obj">Data to encode</param>
+ public byte[] Encode(T obj)
+ {
+ return _encoder.Encode(obj);
+ }
+
+ /// <summary>
+ /// Decodes byte array into the appripriate object type.
+ /// </summary>
+ /// <param name="data">Data to be decoded</param>
+ public T Decode(byte[] data)
+ {
+ return _decoder.Decode(data);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/Impl/MultiDecoder.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/MultiDecoder.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/MultiDecoder.cs
new file mode 100644
index 0000000..789e226
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/Impl/MultiDecoder.cs
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Reflection;
+using Org.Apache.Reef.Utilities.Diagnostics;
+using Org.Apache.Reef.Utilities.Logging;
+using Org.Apache.Reef.Wake.Remote;
+using Org.Apache.Reef.Wake.Remote.Proto.WakeRemoteProtos;
+
+namespace Org.Apache.Reef.Wake.Remote.Impl
+{
+ /// <summary>
+ /// Decoder using the WakeTuple protocol buffer
+ /// (class name and bytes)
+ /// </summary>
+ public class MultiDecoder<T> : IDecoder<T>
+ {
+ private static readonly Logger LOGGER = Logger.GetLogger(typeof(MultiDecoder<T>));
+ private Dictionary<Type, object> _decoderMap;
+ private Dictionary<string, Type> _nameMap;
+
+ /// <summary>
+ /// Constructs a decoder that decodes bytes based on the class type
+ /// </summary>
+ public MultiDecoder()
+ {
+ _decoderMap = new Dictionary<Type, object>();
+ _nameMap = new Dictionary<string, Type>();
+ }
+
+ /// <summary>
+ /// Register the decoder for objects of type U
+ /// </summary>
+ /// <typeparam name="U">The type of decoder to use when decoding
+ /// objects of this type</typeparam>
+ /// <param name="decoder">The decoder to use when decoding
+ /// objects of this type</param>
+ public void Register<U>(IDecoder<U> decoder) where U : T
+ {
+ Type type = typeof(U);
+ _decoderMap[type] = decoder;
+ _nameMap[type.ToString()] = type;
+ }
+
+ /// <summary>
+ /// Register the decoder for objects of type U
+ /// </summary>
+ /// <typeparam name="U">The type of decoder to use when decoding
+ /// objects of this type</typeparam>
+ /// <param name="decoder">The decoder to use when decoding
+ /// objects of this type</param>
+ /// <param name="name">The name of the class to decode</param>
+ public void Register<U>(IDecoder<U> decoder, string name) where U : T
+ {
+ Type type = typeof(U);
+ _decoderMap[type] = decoder;
+ _nameMap[name] = type;
+ }
+
+ /// <summary>
+ /// Decodes byte array according to the underlying object type.
+ /// </summary>
+ /// <param name="data">The data to decode</param>
+ public T Decode(byte[] data)
+ {
+ WakeTuplePBuf pbuf = WakeTuplePBuf.Deserialize(data);
+ if (pbuf == null)
+ {
+ return default(T);
+ }
+
+ // Get object's class Type
+ Type type;
+ if (!_nameMap.TryGetValue(pbuf.className, out type))
+ {
+ return default(T);
+ }
+
+ // Get decoder for that type
+ object decoder;
+ if (!_decoderMap.TryGetValue(type, out decoder))
+ {
+ Exceptions.Throw(new RemoteRuntimeException("Decoder for " + type + " not known."), LOGGER);
+ }
+
+ // Invoke the decoder to decode the byte array
+ Type handlerType = typeof(IDecoder<>).MakeGenericType(new[] { type });
+ MethodInfo info = handlerType.GetMethod("Decode");
+ return (T) info.Invoke(decoder, new[] { (object) pbuf.data });
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/Impl/MultiEncoder.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/MultiEncoder.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/MultiEncoder.cs
new file mode 100644
index 0000000..cccf52f
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/Impl/MultiEncoder.cs
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Reflection;
+using Org.Apache.Reef.Utilities.Logging;
+using Org.Apache.Reef.Wake.Remote.Proto.WakeRemoteProtos;
+
+namespace Org.Apache.Reef.Wake.Remote.Impl
+{
+ /// <summary>
+ /// Encoder using the WakeTuple protocol buffer
+ /// (class name and bytes)
+ /// </summary>
+ public class MultiEncoder<T> : IEncoder<T>
+ {
+ private static Logger _logger = Logger.GetLogger(typeof(MultiEncoder<>));
+ private Dictionary<Type, object> _encoderMap;
+ private Dictionary<Type, string> _nameMap;
+
+ /// <summary>
+ /// Constructs an encoder that encodes an object to bytes based on the class name
+ /// </summary>
+ public MultiEncoder()
+ {
+ _encoderMap = new Dictionary<Type, object>();
+ _nameMap = new Dictionary<Type, string>();
+ }
+
+ public void Register<U>(IEncoder<U> encoder) where U : T
+ {
+ _encoderMap[typeof(U)] = encoder;
+ _nameMap[typeof(U)] = typeof(U).ToString();
+ }
+
+ public void Register<U>(IEncoder<U> encoder, string name) where U : T
+ {
+ _encoderMap[typeof(U)] = encoder;
+ _nameMap[typeof(U)] = name;
+ _logger.Log(Level.Verbose, "Registering name for " + name);
+ }
+
+ /// <summary>Encodes an object to a byte array</summary>
+ /// <param name="obj"></param>
+ public byte[] Encode(T obj)
+ {
+ // Find encoder for object type
+ object encoder;
+ if (!_encoderMap.TryGetValue(obj.GetType(), out encoder))
+ {
+ return null;
+ }
+
+ // Invoke encoder for this type
+ Type handlerType = typeof(IEncoder<>).MakeGenericType(new[] { obj.GetType() });
+ MethodInfo info = handlerType.GetMethod("Encode");
+ byte[] data = (byte[]) info.Invoke(encoder, new[] { (object) obj });
+
+ // Serialize object type and object data into well known tuple
+ // To decode, deserialize the tuple, get object type, and look up the
+ // decoder for that type
+ string name = _nameMap[obj.GetType()];
+ _logger.Log(Level.Verbose, "Encoding name for " + name);
+ WakeTuplePBuf pbuf = new WakeTuplePBuf { className = name, data = data };
+ pbuf.className = name;
+ pbuf.data = data;
+ return pbuf.Serialize();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/Impl/ObserverContainer.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/ObserverContainer.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/ObserverContainer.cs
new file mode 100644
index 0000000..577cd95
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/Impl/ObserverContainer.cs
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Linq;
+using System.Net;
+using System.Text;
+using System.Threading.Tasks;
+using Org.Apache.Reef.Utilities.Diagnostics;
+using Org.Apache.Reef.Utilities.Logging;
+using Org.Apache.Reef.Wake.RX.Impl;
+using Org.Apache.Reef.Wake.Util;
+
+namespace Org.Apache.Reef.Wake.Remote.Impl
+{
+ /// <summary>
+ /// Stores registered IObservers for DefaultRemoteManager.
+ /// Can register and look up IObservers by remote IPEndPoint.
+ /// </summary>
+ internal class ObserverContainer<T> : IObserver<TransportEvent<IRemoteEvent<T>>>
+ {
+ private ConcurrentDictionary<IPEndPoint, IObserver<T>> _endpointMap;
+ private ConcurrentDictionary<Type, IObserver<IRemoteMessage<T>>> _typeMap;
+ private IObserver<T> _universalObserver;
+
+ /// <summary>
+ /// Constructs a new ObserverContainer used to manage remote IObservers.
+ /// </summary>
+ public ObserverContainer()
+ {
+ _endpointMap = new ConcurrentDictionary<IPEndPoint, IObserver<T>>(new IPEndPointComparer());
+ _typeMap = new ConcurrentDictionary<Type, IObserver<IRemoteMessage<T>>>();
+ }
+
+ /// <summary>
+ /// Registers an IObserver used to handle incoming messages from the remote host
+ /// at the specified IPEndPoint.
+ /// </summary>
+ /// <param name="remoteEndpoint">The IPEndPoint of the remote host</param>
+ /// <param name="observer">The IObserver to handle incoming messages</param>
+ /// <returns>An IDisposable used to unregister the observer with</returns>
+ public IDisposable RegisterObserver(IPEndPoint remoteEndpoint, IObserver<T> observer)
+ {
+ if (remoteEndpoint.Address.Equals(IPAddress.Any))
+ {
+ _universalObserver = observer;
+ return Disposable.Create(() => { _universalObserver = null; });
+ }
+
+ _endpointMap[remoteEndpoint] = observer;
+ return Disposable.Create(() => _endpointMap.TryRemove(remoteEndpoint, out observer));
+ }
+
+ /// <summary>
+ /// Registers an IObserver to handle incoming messages from a remote host
+ /// </summary>
+ /// <param name="observer">The IObserver to handle incoming messages</param>
+ /// <returns>An IDisposable used to unregister the observer with</returns>
+ public IDisposable RegisterObserver(IObserver<IRemoteMessage<T>> observer)
+ {
+ _typeMap[typeof(T)] = observer;
+ return Disposable.Create(() => _typeMap.TryRemove(typeof(T), out observer));
+ }
+
+ /// <summary>
+ /// Look up the IObserver for the registered IPEndPoint or event type
+ /// and execute the IObserver.
+ /// </summary>
+ /// <param name="transportEvent">The incoming remote event</param>
+ public void OnNext(TransportEvent<IRemoteEvent<T>> transportEvent)
+ {
+ IRemoteEvent<T> remoteEvent = transportEvent.Data;
+ remoteEvent.LocalEndPoint = transportEvent.Link.LocalEndpoint;
+ remoteEvent.RemoteEndPoint = transportEvent.Link.RemoteEndpoint;
+ T value = remoteEvent.Value;
+ bool handled = false;
+
+ IObserver<T> observer1;
+ IObserver<IRemoteMessage<T>> observer2;
+ if (_universalObserver != null)
+ {
+ _universalObserver.OnNext(value);
+ handled = true;
+ }
+ if (_endpointMap.TryGetValue(remoteEvent.RemoteEndPoint, out observer1))
+ {
+ // IObserver was registered by IPEndpoint
+ observer1.OnNext(value);
+ handled = true;
+ }
+ else if (_typeMap.TryGetValue(value.GetType(), out observer2))
+ {
+ // IObserver was registered by event type
+ IRemoteIdentifier id = new SocketRemoteIdentifier(remoteEvent.RemoteEndPoint);
+ IRemoteMessage<T> remoteMessage = new DefaultRemoteMessage<T>(id, value);
+ observer2.OnNext(remoteMessage);
+ handled = true;
+ }
+
+ if (!handled)
+ {
+ throw new WakeRuntimeException("Unrecognized Wake RemoteEvent message");
+ }
+ }
+
+ public void OnError(Exception error)
+ {
+ }
+
+ public void OnCompleted()
+ {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEvent.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEvent.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEvent.cs
new file mode 100644
index 0000000..9e2fe2a
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEvent.cs
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Net;
+
+namespace Org.Apache.Reef.Wake.Remote.Impl
+{
+ public class RemoteEvent<T> : IRemoteEvent<T>
+ {
+ public RemoteEvent(IPEndPoint localEndPoint, IPEndPoint remoteEndPoint, string source, string sink, long seq, T value)
+ {
+ LocalEndPoint = localEndPoint;
+ RemoteEndPoint = remoteEndPoint;
+ Source = source;
+ Sink = sink;
+ Value = value;
+ Sequence = seq;
+ }
+
+ public RemoteEvent(IPEndPoint localEndpoint, IPEndPoint remoteEndpoint, T value)
+ {
+ LocalEndPoint = localEndpoint;
+ RemoteEndPoint = remoteEndpoint;
+ Value = value;
+ }
+
+ public RemoteEvent()
+ {
+ }
+
+ public IPEndPoint LocalEndPoint { get; set; }
+
+ public IPEndPoint RemoteEndPoint { get; set; }
+
+ public string Source { get; set; }
+
+ public string Sink { get; set; }
+
+ public T Value { get; set; }
+
+ public long Sequence { get; set; }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventCodec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventCodec.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventCodec.cs
new file mode 100644
index 0000000..2c5b16d
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventCodec.cs
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.Reef.Wake.Remote.Impl
+{
+ internal class RemoteEventCodec<T> : ICodec<IRemoteEvent<T>>
+ {
+ private readonly RemoteEventEncoder<T> _encoder;
+ private readonly RemoteEventDecoder<T> _decoder;
+
+ public RemoteEventCodec(ICodec<T> codec)
+ {
+ _encoder = new RemoteEventEncoder<T>(codec);
+ _decoder = new RemoteEventDecoder<T>(codec);
+ }
+
+ public byte[] Encode(IRemoteEvent<T> obj)
+ {
+ return _encoder.Encode(obj);
+ }
+
+ public IRemoteEvent<T> Decode(byte[] data)
+ {
+ return _decoder.Decode(data);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventDecoder.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventDecoder.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventDecoder.cs
new file mode 100644
index 0000000..f9cfbc1
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventDecoder.cs
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Wake.Remote.Proto.WakeRemoteProtos;
+
+namespace Org.Apache.Reef.Wake.Remote.Impl
+{
+ public class RemoteEventDecoder<T> : IDecoder<IRemoteEvent<T>>
+ {
+ private IDecoder<T> _decoder;
+
+ public RemoteEventDecoder(IDecoder<T> decoder)
+ {
+ _decoder = decoder;
+ }
+
+ public IRemoteEvent<T> Decode(byte[] data)
+ {
+ WakeMessagePBuf pbuf = WakeMessagePBuf.Deserialize(data);
+ return new RemoteEvent<T>(null, null, pbuf.source, pbuf.sink, pbuf.seq, _decoder.Decode(pbuf.data));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventEncoder.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventEncoder.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventEncoder.cs
new file mode 100644
index 0000000..432e688
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventEncoder.cs
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Wake.Remote.Proto.WakeRemoteProtos;
+
+namespace Org.Apache.Reef.Wake.Remote.Impl
+{
+ public class RemoteEventEncoder<T> : IEncoder<IRemoteEvent<T>>
+ {
+ private readonly IEncoder<T> _encoder;
+
+ public RemoteEventEncoder(IEncoder<T> encoder)
+ {
+ _encoder = encoder;
+ }
+
+ public byte[] Encode(IRemoteEvent<T> obj)
+ {
+ WakeMessagePBuf pbuf = new WakeMessagePBuf();
+ pbuf.sink = obj.Sink;
+ pbuf.source = obj.Source;
+ pbuf.data = _encoder.Encode(obj.Value);
+ pbuf.seq = obj.Sequence;
+ return pbuf.Serialize();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventEndpoint.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventEndpoint.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventEndpoint.cs
new file mode 100644
index 0000000..10a048e
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventEndpoint.cs
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Org.Apache.Reef.Wake.Remote.Impl
+{
+ public class RemoteEventEndPoint<T>
+ {
+ private IRemoteIdentifier _id;
+
+ public RemoteEventEndPoint(IRemoteIdentifier id)
+ {
+ _id = id;
+ }
+
+ public IRemoteIdentifier Id
+ {
+ get { return _id; }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/Impl/SocketRemoteIdentifier.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/SocketRemoteIdentifier.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/SocketRemoteIdentifier.cs
new file mode 100644
index 0000000..2f402a8
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/Impl/SocketRemoteIdentifier.cs
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Utilities.Diagnostics;
+using Org.Apache.Reef.Utilities.Logging;
+using System.Globalization;
+using System.Net;
+using System.Text;
+
+namespace Org.Apache.Reef.Wake.Remote.Impl
+{
+ /// <summary>
+ /// Remote identifier based on a socket address
+ /// </summary>
+ public class SocketRemoteIdentifier : IRemoteIdentifier
+ {
+ private static readonly Logger LOGGER = Logger.GetLogger(typeof(SocketRemoteIdentifier));
+ private IPEndPoint _addr;
+
+ public SocketRemoteIdentifier(IPEndPoint addr)
+ {
+ _addr = addr;
+ }
+
+ public SocketRemoteIdentifier(string str)
+ {
+ int index = str.IndexOf(":", System.StringComparison.Ordinal);
+ if (index <= 0)
+ {
+ Exceptions.Throw(new RemoteRuntimeException("Invalid name " + str), LOGGER);
+ }
+ string host = str.Substring(0, index);
+ int port = int.Parse(str.Substring(index + 1), CultureInfo.InvariantCulture);
+ _addr = new IPEndPoint(IPAddress.Parse(host), port);
+ }
+
+ public IPEndPoint Addr
+ {
+ get { return _addr; }
+ }
+
+ public override int GetHashCode()
+ {
+ return _addr.GetHashCode();
+ }
+
+ public override bool Equals(object obj)
+ {
+ return _addr.Equals(((SocketRemoteIdentifier)obj).Addr);
+ }
+
+ public override string ToString()
+ {
+ StringBuilder builder = new StringBuilder();
+ builder.Append("socket://");
+ builder.Append(_addr);
+ return builder.ToString();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/Impl/StringCodec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/StringCodec.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/StringCodec.cs
new file mode 100644
index 0000000..4e25b18
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/Impl/StringCodec.cs
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Org.Apache.Reef.Tang.Annotations;
+
+namespace Org.Apache.Reef.Wake.Remote.Impl
+{
+ public class StringCodec : ICodec<string>
+ {
+ [Inject]
+ public StringCodec()
+ {
+ }
+
+ public byte[] Encode(string obj)
+ {
+ return Encoding.ASCII.GetBytes(obj);
+ }
+
+ public string Decode(byte[] data)
+ {
+ return Encoding.ASCII.GetString(data);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/Impl/StringIdentifier.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/StringIdentifier.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/StringIdentifier.cs
new file mode 100644
index 0000000..31829cd
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/Impl/StringIdentifier.cs
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Org.Apache.Reef.Wake.Remote.Impl
+{
+ public class StringIdentifier : IIdentifier
+ {
+ private readonly string _str;
+
+ public StringIdentifier(string s)
+ {
+ _str = s;
+ }
+
+ public override int GetHashCode()
+ {
+ return _str.GetHashCode();
+ }
+
+ public override bool Equals(object o)
+ {
+ StringIdentifier other = o as StringIdentifier;
+ return other != null && _str.Equals(other._str);
+ }
+
+ public override string ToString()
+ {
+ return _str;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/Impl/StringIdentifierFactory.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/StringIdentifierFactory.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/StringIdentifierFactory.cs
new file mode 100644
index 0000000..b9bcb50
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/Impl/StringIdentifierFactory.cs
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Tang.Annotations;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Org.Apache.Reef.Wake.Remote.Impl
+{
+ public class StringIdentifierFactory : IIdentifierFactory
+ {
+ [Inject]
+ public StringIdentifierFactory()
+ {
+ }
+
+ public IIdentifier Create(string s)
+ {
+ return new StringIdentifier(s);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/Impl/TransportClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/TransportClient.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/TransportClient.cs
new file mode 100644
index 0000000..848a915
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/Impl/TransportClient.cs
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Net;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Org.Apache.Reef.Wake.Remote.Impl
+{
+ /// <summary>
+ /// Establish connections to TransportServer for remote message passing
+ /// </summary>
+ public class TransportClient<T> : IDisposable
+ {
+ private ILink<T> _link;
+ private IObserver<TransportEvent<T>> _observer;
+ private CancellationTokenSource _cancellationSource;
+ private bool _disposed;
+
+ /// <summary>
+ /// Construct a TransportClient.
+ /// Used to send messages to the specified remote endpoint.
+ /// </summary>
+ /// <param name="remoteEndpoint">The endpoint of the remote server to connect to</param>
+ /// <param name="codec">Codec to decode/encodec</param>
+ public TransportClient(IPEndPoint remoteEndpoint, ICodec<T> codec)
+ {
+ if (remoteEndpoint == null)
+ {
+ throw new ArgumentNullException("remoteEndpoint");
+ }
+ if (codec == null)
+ {
+ throw new ArgumentNullException("codec");
+ }
+
+ _link = new Link<T>(remoteEndpoint, codec);
+ _cancellationSource = new CancellationTokenSource();
+ _disposed = false;
+ }
+
+ /// <summary>
+ /// Construct a TransportClient.
+ /// Used to send messages to the specified remote endpoint.
+ /// </summary>
+ /// <param name="remoteEndpoint">The endpoint of the remote server to connect to</param>
+ /// <param name="codec">Codec to decode/encodec</param>
+ /// <param name="observer">Callback used when receiving responses from remote host</param>
+ public TransportClient(IPEndPoint remoteEndpoint,
+ ICodec<T> codec,
+ IObserver<TransportEvent<T>> observer)
+ : this(remoteEndpoint, codec)
+ {
+ _observer = observer;
+ Task.Run(() => ResponseLoop());
+ }
+
+ /// <summary>
+ /// Gets the underlying transport link.
+ /// </summary>
+ public ILink<T> Link
+ {
+ get { return _link; }
+ }
+
+ /// <summary>
+ /// Send the remote message.
+ /// </summary>
+ /// <param name="message">The message to send</param>
+ public void Send(T message)
+ {
+ if (message == null)
+ {
+ throw new ArgumentNullException("message");
+ }
+
+ _link.Write(message);
+ }
+
+ /// <summary>
+ /// Close all opened connections
+ /// </summary>
+ public void Dispose()
+ {
+ Dispose(true);
+ GC.SuppressFinalize(this);
+ }
+
+ protected void Dispose(bool disposing)
+ {
+ if (!_disposed && disposing)
+ {
+ _link.Dispose();
+ _disposed = true;
+ }
+ }
+
+ /// <summary>
+ /// Continually read responses from remote host
+ /// </summary>
+ private async Task ResponseLoop()
+ {
+ while (!_cancellationSource.IsCancellationRequested)
+ {
+ T message = await _link.ReadAsync(_cancellationSource.Token);
+ if (message == null)
+ {
+ break;
+ }
+
+ TransportEvent<T> transportEvent = new TransportEvent<T>(message, _link);
+ _observer.OnNext(transportEvent);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/Impl/TransportEvent.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/TransportEvent.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/TransportEvent.cs
new file mode 100644
index 0000000..6c4644c
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/Impl/TransportEvent.cs
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Net;
+using System.Net.Sockets;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Org.Apache.Reef.Wake.Remote.Impl
+{
+ public class TransportEvent<T>
+ {
+ public TransportEvent(T data, ILink<T> link)
+ {
+ Data = data;
+ Link = link;
+ }
+
+ public T Data { get; private set; }
+
+ public ILink<T> Link { get; private set; }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/Impl/TransportServer.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/TransportServer.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/TransportServer.cs
new file mode 100644
index 0000000..20cdc8a
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/Impl/TransportServer.cs
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Utilities.Diagnostics;
+using Org.Apache.Reef.Utilities.Logging;
+using System;
+using System.Globalization;
+using System.Net;
+using System.Net.Sockets;
+using System.Threading;
+using System.Threading.Tasks;
+using Org.Apache.Reef.Wake.Util;
+
+namespace Org.Apache.Reef.Wake.Remote.Impl
+{
+ /// <summary>
+ /// Server to handle incoming remote messages.
+ /// </summary>
+ public class TransportServer<T> : IDisposable
+ {
+ private static readonly Logger LOGGER = Logger.GetLogger(typeof(TransportServer<>));
+
+ private TcpListener _listener;
+ private CancellationTokenSource _cancellationSource;
+ private IObserver<TransportEvent<T>> _remoteObserver;
+ private ICodec<T> _codec;
+ private bool _disposed;
+ private Task _serverTask;
+
+ /// <summary>
+ /// Constructs a TransportServer to listen for remote events.
+ /// Listens on the specified remote endpoint. When it recieves a remote
+ /// event, it will envoke the specified remote handler.
+ /// </summary>
+ /// <param name="port">Port to listen on</param>
+ /// <param name="remoteHandler">The handler to invoke when receiving incoming
+ /// remote messages</param>
+ /// <param name="codec">The codec to encode/decode"</param>
+ public TransportServer(int port, IObserver<TransportEvent<T>> remoteHandler, ICodec<T> codec)
+ : this(new IPEndPoint(NetworkUtils.LocalIPAddress, port), remoteHandler, codec)
+ {
+ }
+
+ /// <summary>
+ /// Constructs a TransportServer to listen for remote events.
+ /// Listens on the specified remote endpoint. When it recieves a remote
+ /// event, it will envoke the specified remote handler.
+ /// </summary>
+ /// <param name="localEndpoint">Endpoint to listen on</param>
+ /// <param name="remoteHandler">The handler to invoke when receiving incoming
+ /// remote messages</param>
+ /// <param name="codec">The codec to encode/decode"</param>
+ public TransportServer(IPEndPoint localEndpoint,
+ IObserver<TransportEvent<T>> remoteHandler,
+ ICodec<T> codec)
+ {
+ _listener = new TcpListener(localEndpoint.Address, localEndpoint.Port);
+ _remoteObserver = remoteHandler;
+ _cancellationSource = new CancellationTokenSource();
+ _cancellationSource.Token.ThrowIfCancellationRequested();
+ _codec = codec;
+ _disposed = false;
+ }
+
+ /// <summary>
+ /// Returns the listening endpoint for the TransportServer
+ /// </summary>
+ public IPEndPoint LocalEndpoint
+ {
+ get { return _listener.LocalEndpoint as IPEndPoint; }
+ }
+
+ /// <summary>
+ /// Starts listening for incoming remote messages.
+ /// </summary>
+ public void Run()
+ {
+ _listener.Start();
+ _serverTask = Task.Run(() => StartServer());
+ }
+
+ /// <summary>
+ /// Close the TransportServer and all open connections
+ /// </summary>
+ public void Dispose()
+ {
+ Dispose(true);
+ GC.SuppressFinalize(this);
+ }
+
+ public void Dispose(bool disposing)
+ {
+ if (!_disposed && disposing)
+ {
+ _cancellationSource.Cancel();
+ try
+ {
+ _listener.Stop();
+ }
+ catch (SocketException)
+ {
+ LOGGER.Log(Level.Info, "Disposing of transport server before listener is created.");
+ }
+
+ if (_serverTask != null)
+ {
+ _serverTask.Wait();
+
+ // Give the TransportServer Task 500ms to shut down, ignore any timeout errors
+ try
+ {
+ CancellationTokenSource serverDisposeTimeout = new CancellationTokenSource(500);
+ _serverTask.Wait(serverDisposeTimeout.Token);
+ }
+ catch (Exception e)
+ {
+ Console.Error.WriteLine(e);
+ }
+ finally
+ {
+ _serverTask.Dispose();
+ }
+ }
+ }
+
+ _disposed = true;
+ }
+
+ /// <summary>
+ /// Helper method to start TransportServer. This will
+ /// be run in an asynchronous Task.
+ /// </summary>
+ /// <returns>An asynchronous Task for the running server.</returns>
+ private async Task StartServer()
+ {
+ try
+ {
+ while (!_cancellationSource.Token.IsCancellationRequested)
+ {
+ TcpClient client = await _listener.AcceptTcpClientAsync().ConfigureAwait(false);
+ ProcessClient(client).Forget();
+ }
+ }
+ catch (InvalidOperationException)
+ {
+ LOGGER.Log(Level.Info, "TransportServer has been closed.");
+ }
+ catch (OperationCanceledException)
+ {
+ LOGGER.Log(Level.Info, "TransportServer has been closed.");
+ }
+ }
+
+ /// <summary>
+ /// Recieves event from connected TcpClient and invokes handler on the event.
+ /// </summary>
+ /// <param name="client">The connected client</param>
+ private async Task ProcessClient(TcpClient client)
+ {
+ // Keep reading messages from client until they disconnect or timeout
+ CancellationToken token = _cancellationSource.Token;
+ using (ILink<T> link = new Link<T>(client, _codec))
+ {
+ while (!token.IsCancellationRequested)
+ {
+ T message = await link.ReadAsync(token);
+ TransportEvent<T> transportEvent = new TransportEvent<T>(message, link);
+
+ _remoteObserver.OnNext(transportEvent);
+
+ if (message == null)
+ {
+ break;
+ }
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/Proto/WakeRemoteProtos.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/Proto/WakeRemoteProtos.cs b/lang/cs/Source/WAKE/Wake/Remote/Proto/WakeRemoteProtos.cs
new file mode 100644
index 0000000..247fff9
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/Proto/WakeRemoteProtos.cs
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using ProtoBuf;
+using System;
+using System.IO;
+using System.Runtime.Serialization;
+
+namespace Org.Apache.Reef.Wake.Remote.Proto.WakeRemoteProtos
+{
+ /// <summary>
+ /// Message p buff
+ /// </summary>
+ public partial class WakeMessagePBuf
+ {
+ public static WakeMessagePBuf Deserialize(byte[] bytes)
+ {
+ WakeMessagePBuf pbuf = null;
+ using (var s = new MemoryStream(bytes))
+ {
+ pbuf = Serializer.Deserialize<WakeMessagePBuf>(s);
+ }
+ return pbuf;
+ }
+
+ public byte[] Serialize()
+ {
+ using (var s = new MemoryStream())
+ {
+ Serializer.Serialize(s, this);
+ return s.ToArray();
+ }
+ }
+ }
+
+ /// <summary>
+ /// Wake tuple buf
+ /// </summary>
+ public partial class WakeTuplePBuf
+ {
+ public static WakeTuplePBuf Deserialize(byte[] bytes)
+ {
+ WakeTuplePBuf pbuf = null;
+ using (var s = new MemoryStream(bytes))
+ {
+ pbuf = Serializer.Deserialize<WakeTuplePBuf>(s);
+ }
+ return pbuf;
+ }
+
+ public byte[] Serialize()
+ {
+ using (var s = new MemoryStream())
+ {
+ Serializer.Serialize(s, this);
+ return s.ToArray();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/RemoteConfiguration.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/RemoteConfiguration.cs b/lang/cs/Source/WAKE/Wake/Remote/RemoteConfiguration.cs
new file mode 100644
index 0000000..633cc79
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/RemoteConfiguration.cs
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Tang.Annotations;
+using System;
+
+namespace Org.Apache.Reef.Wake.Remote
+{
+ public class RemoteConfiguration
+ {
+ [NamedParameter(shortName: "rm_name", documentation: "The name of the remote manager.")]
+ public class ManagerName : Name<string>
+ {
+ }
+
+ [NamedParameter(shortName: "rm_host", documentation: "The host address to be used for messages.")]
+ public class HostAddress : Name<string>
+ {
+ }
+
+ [NamedParameter(shortName: "rm_port", documentation: "The port to be used for messages.")]
+ public class Port : Name<int>
+ {
+ }
+
+ [NamedParameter(documentation: "The codec to be used for messages.")]
+ public class MessageCodec : Name<ICodec<Type>>
+ {
+ }
+
+ [NamedParameter(documentation: "The event handler to be used for exception")]
+ public class ErrorHandler : Name<IObserver<Exception>>
+ {
+ }
+
+ [NamedParameter(shortName: "rm_order", documentation: "Whether or not to use the message ordering guarantee", defaultValue: "true")]
+ public class OrderingGuarantee : Name<bool>
+ {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/RemoteRuntimeException.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/RemoteRuntimeException.cs b/lang/cs/Source/WAKE/Wake/Remote/RemoteRuntimeException.cs
new file mode 100644
index 0000000..061e532
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/RemoteRuntimeException.cs
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+
+namespace Org.Apache.Reef.Wake.Remote
+{
+ /// <summary>Wake remote runtime exception</summary>
+ [System.Serializable]
+ public class RemoteRuntimeException : Exception
+ {
+ private const long serialVersionUID = 1L;
+
+ /// <summary>Constructs a new runtime remote exception with the specified detail message and cause
+ /// </summary>
+ /// <param name="s">the detailed message</param>
+ /// <param name="e">the cause</param>
+ public RemoteRuntimeException(string s, Exception e)
+ : base(s, e)
+ {
+ }
+
+ /// <summary>Constructs a new runtime remote exception with the specified detail message
+ /// </summary>
+ /// <param name="s">the detailed message</param>
+ public RemoteRuntimeException(string s)
+ : base(s)
+ {
+ }
+
+ /// <summary>Constructs a new runtime remote exception with the specified cause</summary>
+ /// <param name="e">the cause</param>
+ public RemoteRuntimeException(Exception e)
+ : base("Runtime Exception", e)
+ {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Time/Event/Alarm.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Time/Event/Alarm.cs b/lang/cs/Source/WAKE/Wake/Time/Event/Alarm.cs
new file mode 100644
index 0000000..611b5a1
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Time/Event/Alarm.cs
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+
+namespace Org.Apache.Reef.Wake.Time
+{
+ /// <summary>
+ /// Represents a timer event.
+ /// </summary>
+ public abstract class Alarm : Time
+ {
+ private IObserver<Alarm> _handler;
+
+ public Alarm(long timestamp, IObserver<Alarm> handler) : base(timestamp)
+ {
+ _handler = handler;
+ }
+
+ public void Handle()
+ {
+ _handler.OnNext(this);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Time/Event/StartTime.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Time/Event/StartTime.cs b/lang/cs/Source/WAKE/Wake/Time/Event/StartTime.cs
new file mode 100644
index 0000000..4cea1bc
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Time/Event/StartTime.cs
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.Reef.Wake.Time
+{
+ /// <summary>
+ /// Represents the Time at which a component started.
+ /// </summary>
+ public class StartTime : Time
+ {
+ public StartTime(long timeStamp) : base(timeStamp)
+ {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Time/Event/StopTime.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Time/Event/StopTime.cs b/lang/cs/Source/WAKE/Wake/Time/Event/StopTime.cs
new file mode 100644
index 0000000..8e3bf65
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Time/Event/StopTime.cs
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.Reef.Wake.Time
+{
+ /// <summary>
+ /// Represents the Time at which a component stops.
+ /// </summary>
+ public class StopTime : Time
+ {
+ public StopTime(long timeStamp) : base(timeStamp)
+ {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Time/IClock.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Time/IClock.cs b/lang/cs/Source/WAKE/Wake/Time/IClock.cs
new file mode 100644
index 0000000..6d906b8
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Time/IClock.cs
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+
+using Org.Apache.Reef.Tang.Annotations;
+using Org.Apache.Reef.Wake.Impl;
+using Org.Apache.Reef.Wake.Time.Runtime.Event;
+
+namespace Org.Apache.Reef.Wake.Time
+{
+ public abstract class IClock : IDisposable
+ {
+ /// <summary>
+ /// Schedule a TimerEvent at the given future offset
+ /// </summary>
+ /// <param name="offset">The offset in the future to schedule the alarm</param>
+ /// <param name="handler">The IObserver to to be called</param>
+ public abstract void ScheduleAlarm(long offset, IObserver<Alarm> handler);
+
+ /// <summary>
+ /// Clock is idle if it has no future alarms set
+ /// </summary>
+ /// <returns>True if no future alarms are set, otherwise false</returns>
+ public abstract bool IsIdle();
+
+ /// <summary>
+ /// Dispose of the clock and all scheduled alarms
+ /// </summary>
+ public abstract void Dispose();
+
+ /// <summary>
+ /// Bind this to an event handler to statically subscribe to the StartTime Event
+ /// </summary>
+ [NamedParameter(documentation: "Will be called upon the start even", defaultClass: typeof(MissingStartHandlerHandler))]
+ public class StartHandler : Name<ISet<IObserver<StartTime>>>
+ {
+ }
+
+ /// <summary>
+ /// Bind this to an event handler to statically subscribe to the StopTime Event
+ /// </summary>
+ [NamedParameter(documentation: "Will be called upon the stop event", defaultClass: typeof(LoggingEventHandler<StopTime>))]
+ public class StopHandler : Name<ISet<IObserver<StopTime>>>
+ {
+ }
+
+ /// <summary>
+ /// Bind this to an event handler to statically subscribe to the RuntimeStart Event
+ /// </summary>
+ [NamedParameter(documentation: "Will be called upon the runtime start event", defaultClass: typeof(LoggingEventHandler<RuntimeStart>))]
+ public class RuntimeStartHandler : Name<ISet<IObserver<RuntimeStart>>>
+ {
+ }
+
+ /// <summary>
+ /// Bind this to an event handler to statically subscribe to the RuntimeStop Event
+ /// </summary>
+ [NamedParameter(documentation: "Will be called upon the runtime stop event", defaultClass: typeof(LoggingEventHandler<RuntimeStop>))]
+ public class RuntimeStopHandler : Name<ISet<IObserver<RuntimeStop>>>
+ {
+ }
+
+ /// <summary>
+ /// Bind this to an event handler to statically subscribe to the IdleClock Event
+ /// </summary>
+ [NamedParameter(documentation: "Will be called upon the Idle event", defaultClass: typeof(LoggingEventHandler<IdleClock>))]
+ public class IdleHandler : Name<ISet<IObserver<IdleClock>>>
+ {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Time/Runtime/Event/ClientAlarm.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Time/Runtime/Event/ClientAlarm.cs b/lang/cs/Source/WAKE/Wake/Time/Runtime/Event/ClientAlarm.cs
new file mode 100644
index 0000000..ac95896
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Time/Runtime/Event/ClientAlarm.cs
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Org.Apache.Reef.Wake.Time.Runtime.Event
+{
+ public class ClientAlarm : Alarm
+ {
+ public ClientAlarm(long timestamp, IObserver<Alarm> handler) : base(timestamp, handler)
+ {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Time/Runtime/Event/IdleClock.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Time/Runtime/Event/IdleClock.cs b/lang/cs/Source/WAKE/Wake/Time/Runtime/Event/IdleClock.cs
new file mode 100644
index 0000000..3ca1fd8
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Time/Runtime/Event/IdleClock.cs
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.Reef.Wake.Time.Runtime.Event
+{
+ public class IdleClock : Time
+ {
+ public IdleClock(long timestamp) : base(timestamp)
+ {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Time/Runtime/Event/RuntimeAlarm.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Time/Runtime/Event/RuntimeAlarm.cs b/lang/cs/Source/WAKE/Wake/Time/Runtime/Event/RuntimeAlarm.cs
new file mode 100644
index 0000000..701bf88
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Time/Runtime/Event/RuntimeAlarm.cs
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Org.Apache.Reef.Wake.Time.Runtime.Event
+{
+ public class RuntimeAlarm : Alarm
+ {
+ public RuntimeAlarm(long timestamp, IObserver<Alarm> handler) : base(timestamp, handler)
+ {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Time/Runtime/Event/RuntimeStart.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Time/Runtime/Event/RuntimeStart.cs b/lang/cs/Source/WAKE/Wake/Time/Runtime/Event/RuntimeStart.cs
new file mode 100644
index 0000000..d08cccc
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Time/Runtime/Event/RuntimeStart.cs
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.Reef.Wake.Time.Runtime.Event
+{
+ public class RuntimeStart : Time
+ {
+ public RuntimeStart(long timeStamp) : base(timeStamp)
+ {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Time/Runtime/Event/RuntimeStop.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Time/Runtime/Event/RuntimeStop.cs b/lang/cs/Source/WAKE/Wake/Time/Runtime/Event/RuntimeStop.cs
new file mode 100644
index 0000000..6324e20
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Time/Runtime/Event/RuntimeStop.cs
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+
+namespace Org.Apache.Reef.Wake.Time.Runtime.Event
+{
+ public class RuntimeStop : Time
+ {
+ public RuntimeStop(long timestamp) : this(timestamp, null)
+ {
+ }
+
+ public RuntimeStop(long timestamp, Exception e) : base(timestamp)
+ {
+ Exception = e;
+ }
+
+ public Exception Exception { get; private set; }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Time/Runtime/ITimer.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Time/Runtime/ITimer.cs b/lang/cs/Source/WAKE/Wake/Time/Runtime/ITimer.cs
new file mode 100644
index 0000000..c932aa4
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Time/Runtime/ITimer.cs
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Org.Apache.Reef.Tang.Annotations;
+
+namespace Org.Apache.Reef.Wake.Time.Runtime
+{
+ [DefaultImplementation(typeof(RealTimer))]
+ public interface ITimer
+ {
+ /// <summary>
+ /// Gets the current time
+ /// </summary>
+ long CurrentTime { get; }
+
+ /// <summary>
+ /// Gets the difference between the given time and the current time
+ /// </summary>
+ /// <param name="time">The time to compare against the current time</param>
+ long GetDuration(long time);
+
+ /// <summary>
+ /// Checks if the given time has already passed.
+ /// </summary>
+ /// <param name="time">The time to check if it has passed or not</param>
+ bool IsReady(long time);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Time/Runtime/LogicalTimer.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Time/Runtime/LogicalTimer.cs b/lang/cs/Source/WAKE/Wake/Time/Runtime/LogicalTimer.cs
new file mode 100644
index 0000000..f4b6c75
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Time/Runtime/LogicalTimer.cs
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Org.Apache.Reef.Tang.Annotations;
+using Org.Apache.Reef.Wake.Time.Runtime.Event;
+
+namespace Org.Apache.Reef.Wake.Time.Runtime
+{
+ /// <summary>
+ /// LogicalTimer class used for testing purposes.
+ /// </summary>
+ public class LogicalTimer : ITimer
+ {
+ [Inject]
+ public LogicalTimer()
+ {
+ }
+
+ public long CurrentTime
+ {
+ get { return 0; }
+ }
+
+ public long GetDuration(long time)
+ {
+ return 0;
+ }
+
+ public bool IsReady(long time)
+ {
+ return true;
+ }
+ }
+}