You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/01/29 21:42:54 UTC

[10/31] incubator-reef git commit: [REEF-97] Add the REEF.NET code base

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Protobuf/WakeRemoteProtosGen.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Protobuf/WakeRemoteProtosGen.cs b/lang/cs/Source/WAKE/Wake/Protobuf/WakeRemoteProtosGen.cs
new file mode 100644
index 0000000..e4b3f2b
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Protobuf/WakeRemoteProtosGen.cs
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+//------------------------------------------------------------------------------
+// <auto-generated>
+//     This code was generated by a tool.
+//
+//     Changes to this file may cause incorrect behavior and will be lost if
+//     the code is regenerated.
+// </auto-generated>
+//------------------------------------------------------------------------------
+
+// Generated from: src/main/proto/RemoteProtocol.proto
+namespace Org.Apache.Reef.Wake.Remote.Proto.WakeRemoteProtos
+{
+  [global::System.Serializable, global::ProtoBuf.ProtoContract(Name=@"WakeMessagePBuf")]
+  public partial class WakeMessagePBuf : global::ProtoBuf.IExtensible
+  {
+    public WakeMessagePBuf() {}
+    
+    private byte[] _data;
+    [global::ProtoBuf.ProtoMember(1, IsRequired = true, Name=@"data", DataFormat = global::ProtoBuf.DataFormat.Default)]
+    public byte[] data
+    {
+      get { return _data; }
+      set { _data = value; }
+    }
+    private long _seq;
+    [global::ProtoBuf.ProtoMember(2, IsRequired = true, Name=@"seq", DataFormat = global::ProtoBuf.DataFormat.TwosComplement)]
+    public long seq
+    {
+      get { return _seq; }
+      set { _seq = value; }
+    }
+    private string _source = "";
+    [global::ProtoBuf.ProtoMember(3, IsRequired = false, Name=@"source", DataFormat = global::ProtoBuf.DataFormat.Default)]
+    [global::System.ComponentModel.DefaultValue("")]
+    public string source
+    {
+      get { return _source; }
+      set { _source = value; }
+    }
+    private string _sink = "";
+    [global::ProtoBuf.ProtoMember(4, IsRequired = false, Name=@"sink", DataFormat = global::ProtoBuf.DataFormat.Default)]
+    [global::System.ComponentModel.DefaultValue("")]
+    public string sink
+    {
+      get { return _sink; }
+      set { _sink = value; }
+    }
+    private global::ProtoBuf.IExtension extensionObject;
+    global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+      { return global::ProtoBuf.Extensible.GetExtensionObject(ref extensionObject, createIfMissing); }
+  }
+  
+  [global::System.Serializable, global::ProtoBuf.ProtoContract(Name=@"WakeTuplePBuf")]
+  public partial class WakeTuplePBuf : global::ProtoBuf.IExtensible
+  {
+    public WakeTuplePBuf() {}
+    
+    private string _className;
+    [global::ProtoBuf.ProtoMember(1, IsRequired = true, Name=@"className", DataFormat = global::ProtoBuf.DataFormat.Default)]
+    public string className
+    {
+      get { return _className; }
+      set { _className = value; }
+    }
+    private byte[] _data;
+    [global::ProtoBuf.ProtoMember(2, IsRequired = true, Name=@"data", DataFormat = global::ProtoBuf.DataFormat.Default)]
+    public byte[] data
+    {
+      get { return _data; }
+      set { _data = value; }
+    }
+    private global::ProtoBuf.IExtension extensionObject;
+    global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+      { return global::ProtoBuf.Extensible.GetExtensionObject(ref extensionObject, createIfMissing); }
+  }
+  
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/RX/AbstractObserver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/RX/AbstractObserver.cs b/lang/cs/Source/WAKE/Wake/RX/AbstractObserver.cs
new file mode 100644
index 0000000..b452a24
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/RX/AbstractObserver.cs
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Utilities.Logging;
+using System;
+
+namespace Org.Apache.Reef.Wake.RX
+{
+    /// <summary>
+    /// An observer with logging-only onError and onCompleted() methods.
+    /// </summary>
+    /// <typeparam name="T">The observer type</typeparam>
+    public abstract class AbstractObserver<T> : IObserver<T>
+    {
+        private static readonly Logger LOGGER = Logger.GetLogger(typeof(AbstractObserver<T>));
+
+        public virtual void OnError(Exception error)
+        {
+            LOGGER.Log(Level.Info, "The observer " + GetType() + "has received an Exception: " + error);
+        }
+
+        public virtual void OnCompleted()
+        {
+            LOGGER.Log(Level.Verbose, "The observer " + GetType() + "has received an onCompleted() ");
+        }
+
+        public abstract void OnNext(T arg1);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/RX/AbstractRxStage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/RX/AbstractRxStage.cs b/lang/cs/Source/WAKE/Wake/RX/AbstractRxStage.cs
new file mode 100644
index 0000000..20c7431
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/RX/AbstractRxStage.cs
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+
+namespace Org.Apache.Reef.Wake.RX
+{
+    /// <summary>
+    /// An Rx stage that implements metering
+    /// </summary>
+    public abstract class AbstractRxStage<T> : IRxStage<T>
+    {
+        //protected internal readonly Meter meter;
+
+        /// <summary>Constructs an abstact rxstage</summary>
+        /// <param name="meterName">the name of the meter</param>
+        public AbstractRxStage(string meterName)
+        {
+            //meter = new Meter(meterName);
+        }
+
+        /// <summary>Updates the meter</summary>
+        /// <param name="value">the event</param>
+        public virtual void OnNext(T value)
+        {
+            //meter.Mark(1);
+        }
+
+        public abstract void OnCompleted();
+
+        public abstract void OnError(Exception error);
+
+        public virtual void Dispose()
+        {
+            // no op
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/RX/IRxStage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/RX/IRxStage.cs b/lang/cs/Source/WAKE/Wake/RX/IRxStage.cs
new file mode 100644
index 0000000..e756328
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/RX/IRxStage.cs
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+
+namespace Org.Apache.Reef.Wake.RX
+{
+    /// <summary>Stage that executes the observer</summary>
+    public interface IRxStage<T> : IObserver<T>, IStage
+    {
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/RX/IStaticObservable.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/RX/IStaticObservable.cs b/lang/cs/Source/WAKE/Wake/RX/IStaticObservable.cs
new file mode 100644
index 0000000..577db4d
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/RX/IStaticObservable.cs
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+
+namespace Org.Apache.Reef.Wake.RX
+{
+    public interface IStaticObservable
+    {
+        //intentionally empty
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/RX/ISubject.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/RX/ISubject.cs b/lang/cs/Source/WAKE/Wake/RX/ISubject.cs
new file mode 100644
index 0000000..3679470
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/RX/ISubject.cs
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+
+namespace Org.Apache.Reef.Wake.RX
+{
+    /// <summary>A class implementing Observer> and StaticObservable</summary>
+    /// <typeparam name="In">The in type</typeparam>
+    /// <typeparam name="Out">The out type</typeparam>
+    public interface ISubject<In, Out> : IObserver<In>, IStaticObservable
+    {
+        // intentionally empty
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/RX/Impl/PubSubSubject.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/RX/Impl/PubSubSubject.cs b/lang/cs/Source/WAKE/Wake/RX/Impl/PubSubSubject.cs
new file mode 100644
index 0000000..d513020
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/RX/Impl/PubSubSubject.cs
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Reactive.Subjects;
+using System.Reflection;
+using System.Text;
+using Org.Apache.Reef.Utilities.Diagnostics;
+using Org.Apache.Reef.Utilities.Logging;
+
+namespace Org.Apache.Reef.Wake.RX.Impl
+{
+    /// <summary>
+    /// Subject to provide publish/subscribe interface.
+    /// Subscribes to class Types and invokes handlers for a given
+    /// type on call to OnNext
+    /// </summary>
+    /// <typeparam name="T">The super type that all event types
+    /// inherit from</typeparam>
+    public class PubSubSubject<T> : IObserver<T>
+    {
+        private static readonly Logger LOGGER = Logger.GetLogger(typeof(PubSubSubject<T>));
+
+        private Dictionary<Type, List<object>> _classToObserversMap;
+        private bool _completed;
+        private object _mutex;
+
+        /// <summary>
+        /// Constructs a pub-sub Subject
+        /// </summary>
+        public PubSubSubject()
+        {
+            _classToObserversMap = new Dictionary<Type, List<object>>();
+            _mutex = new object();
+        }
+
+        /// <summary>
+        /// Log on completion
+        /// </summary>
+        public void OnCompleted()
+        {
+            lock (_mutex)
+            {
+                _completed = true;
+            }
+        }
+
+        /// <summary>
+        /// Log Exception
+        /// </summary>
+        /// <param name="error"></param>
+        public void OnError(Exception error)
+        {
+            lock (_mutex)
+            {
+                _completed = true;
+            }
+        }
+
+        /// <summary>
+        /// Invoke the subscribed handlers for the event class type
+        /// </summary>
+        /// <param name="value">The event to process</param>
+        public void OnNext(T value)
+        {
+            if (value == null)
+            {
+                Exceptions.Throw(new ArgumentNullException("value"), LOGGER);
+            }
+
+            lock (_mutex)
+            {
+                // If OnCompleted or OnError called, do nothing
+                if (_completed)
+                {
+                    return;
+                }
+
+                // Check that the event type has been subscribed
+                List<object> handlers;
+                if (!_classToObserversMap.TryGetValue(value.GetType(), out handlers))
+                {
+                    Exceptions.Throw(new ArgumentException("No event for type " + value.GetType()), LOGGER);
+                }
+
+                // Invoke each IObserver for the event type
+                foreach (object handler in handlers)
+                {
+                    Type handlerType = typeof(IObserver<>).MakeGenericType(new[] { value.GetType() });
+                    MethodInfo info = handlerType.GetMethod("OnNext");
+                    info.Invoke(handler, new[] { (object) value });
+                }
+            }
+        }
+
+        /// <summary>
+        /// Subscribe an IObserver for an event type
+        /// </summary>
+        /// <typeparam name="U">The event type</typeparam>
+        /// <param name="observer">The observer to handle the event</param>
+        /// <returns>An IDisposable object used to handle unsubscribing
+        /// the IObserver</returns>
+        public IDisposable Subscribe<U>(IObserver<U> observer) where U : T
+        {
+            lock (_mutex)
+            {
+                List<object> observers;
+                if (!_classToObserversMap.TryGetValue(typeof(U), out observers))
+                {
+                    observers = new List<object>();
+                    _classToObserversMap[typeof(U)] = observers;
+                }
+                observers.Add(observer);
+            }
+
+            return new DisposableResource<U>(_classToObserversMap, observer, _mutex);
+        }
+
+        /// <summary>
+        /// Utility class to handle disposing of an IObserver
+        /// </summary>
+        private class DisposableResource<U> : IDisposable
+        {
+            private Dictionary<Type, List<object>> _observersMap;
+            private IObserver<U> _observer;
+            private object _mutex;
+            private bool _disposed;
+            
+            public DisposableResource(Dictionary<Type, List<object>> observersMap, IObserver<U> observer, object mutex)
+            {
+                _observersMap = observersMap;
+                _observer = observer;
+                _mutex = mutex;
+                _disposed = false;
+            }
+
+            /// <summary>
+            /// Unsubscribe the IObserver from the observer map
+            /// </summary>
+            public void Dispose()
+            {
+                if (!_disposed)
+                {
+                    UnsubscribeObserver();
+                    _disposed = true;
+                }
+            }
+
+            private void UnsubscribeObserver()
+            {
+                lock (_mutex)
+                {
+                    List<object> observers;
+                    if (_observersMap.TryGetValue(typeof(U), out observers))
+                    {
+                        observers.Remove(_observer);
+                    }
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/RX/Impl/RxSyncStage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/RX/Impl/RxSyncStage.cs b/lang/cs/Source/WAKE/Wake/RX/Impl/RxSyncStage.cs
new file mode 100644
index 0000000..4803f89
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/RX/Impl/RxSyncStage.cs
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ 
+using System;
+
+namespace Org.Apache.Reef.Wake.RX.Impl
+{
+    /// <summary>Stage that executes the observer synchronously</summary>
+    public class RxSyncStage<T> : AbstractRxStage<T>
+    {
+        private readonly IObserver<T> _observer;
+
+        /// <summary>Constructs a Rx synchronous stage</summary>
+        /// <param name="observer">the observer</param>
+        public RxSyncStage(IObserver<T> observer) : base(observer.GetType().FullName)
+        {
+            _observer = observer;
+        }
+
+        /// <summary>Provides the observer with the new value</summary>
+        /// <param name="value">the new value</param>
+        public override void OnNext(T value)
+        {
+            base.OnNext(value);
+            _observer.OnNext(value);
+        }
+
+        /// <summary>
+        /// Notifies the observer that the provider has experienced an error
+        /// condition.
+        /// </summary>
+        /// <param name="error">the error</param>
+        public override void OnError(Exception error)
+        {
+            _observer.OnError(error);
+        }
+
+        /// <summary>
+        /// Notifies the observer that the provider has finished sending push-based
+        /// notifications.
+        /// </summary>
+        public override void OnCompleted()
+        {
+            _observer.OnCompleted();
+        }
+
+        /// <summary>
+        /// Closes the stage
+        /// </summary>
+        public override void Dispose()
+        {
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/RX/Impl/RxThreadPoolStage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/RX/Impl/RxThreadPoolStage.cs b/lang/cs/Source/WAKE/Wake/RX/Impl/RxThreadPoolStage.cs
new file mode 100644
index 0000000..4986055
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/RX/Impl/RxThreadPoolStage.cs
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Utilities.Diagnostics;
+using Org.Apache.Reef.Utilities.Logging;
+using Org.Apache.Reef.Wake.Util;
+using System;
+
+namespace Org.Apache.Reef.Wake.RX.Impl
+{
+    /// <summary>Stage that executes the observer with a thread pool</summary>
+    public class RxThreadPoolStage<T> : AbstractRxStage<T>
+    {
+        private static readonly Logger LOGGER = Logger.GetLogger(typeof(RxThreadPoolStage<T>));
+
+        private readonly IObserver<T> _observer;
+
+        private readonly ITaskService _taskService;
+
+        /// <summary>Constructs a Rx thread pool stage</summary>
+        /// <param name="observer">the observer to execute</param>
+        /// <param name="numThreads">the number of threads</param>
+        public RxThreadPoolStage(IObserver<T> observer, int numThreads) 
+            : base(observer.GetType().FullName)
+        {
+            _observer = observer;
+            if (numThreads <= 0)
+            {
+                Exceptions.Throw(new WakeRuntimeException("numThreads " + numThreads + " is less than or equal to 0"), LOGGER);
+            }
+            _taskService = new FixedThreadPoolTaskService(numThreads);
+        }
+
+        /// <summary>Provides the observer with the new value</summary>
+        /// <param name="value">the new value</param>
+        public override void OnNext(T value)
+        {
+            base.OnNext(value);
+            _taskService.Execute(new _Startable_58(this, value).Start);
+        }
+
+        /// <summary>
+        /// Notifies the observer that the provider has experienced an error
+        /// condition.
+        /// </summary>
+        /// <param name="error">the error</param>
+        public override void OnError(Exception error)
+        {
+            _taskService.Execute(new _Startable_75(this, error).Start);
+        }
+
+        /// <summary>
+        /// Notifies the observer that the provider has finished sending push-based
+        /// notifications.
+        /// </summary>
+        public override void OnCompleted()
+        {
+            _taskService.Execute(new _Startable_91(this).Start);
+        }
+
+        /// <summary>
+        /// Closes the stage
+        /// </summary>
+        public override void Dispose()
+        {
+            _taskService.Shutdown();
+        }
+
+        private sealed class _Startable_58 : IStartable
+        {
+            private readonly RxThreadPoolStage<T> _enclosing;
+            private readonly T _value;
+
+            public _Startable_58(RxThreadPoolStage<T> enclosing, T value)
+            {
+                _enclosing = enclosing;
+                _value = value;
+            }
+
+            public void Start()
+            {
+                _enclosing._observer.OnNext(_value);
+            }
+        }
+
+        private sealed class _Startable_75 : IStartable
+        {
+            private readonly RxThreadPoolStage<T> _enclosing;
+            private readonly Exception _error;
+
+            public _Startable_75(RxThreadPoolStage<T> enclosing, Exception error)
+            {
+                _enclosing = enclosing;
+                _error = error;
+            }
+
+            public void Start()
+            {
+                _enclosing._observer.OnError(_error);
+            }
+        }
+
+        private sealed class _Startable_91 : IStartable
+        {
+            private readonly RxThreadPoolStage<T> _enclosing;
+
+            public _Startable_91(RxThreadPoolStage<T> enclosing)
+            {
+                _enclosing = enclosing;
+            }
+
+            public void Start()
+            {
+                _enclosing._observer.OnCompleted();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/RX/Impl/RxTimerStage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/RX/Impl/RxTimerStage.cs b/lang/cs/Source/WAKE/Wake/RX/Impl/RxTimerStage.cs
new file mode 100644
index 0000000..44dd77c
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/RX/Impl/RxTimerStage.cs
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Timers;
+
+using Org.Apache.Reef.Wake.Impl;
+
+namespace Org.Apache.Reef.Wake.RX.Impl
+{
+    /// <summary>Timer stage that provides events to the observer periodically</summary>
+    public class RxTimerStage : IStage, IStaticObservable
+    {
+        private readonly Timer _timer;
+        private readonly PeriodicEvent _value = new PeriodicEvent();
+        private readonly IObserver<PeriodicEvent> _observer;
+
+        /// <summary>Constructs a Rx timer stage</summary>
+        /// <param name="observer">the observer</param>
+        /// <param name="period">the period in milli-seconds</param>
+        public RxTimerStage(IObserver<PeriodicEvent> observer, long period) 
+            : this(observer, 0, period)
+        {
+        }
+
+        /// <summary>Constructs a Rx timer stage</summary>
+        /// <param name="observer">the observer</param>
+        /// <param name="initialDelay">the initial delay in milli-seconds</param>
+        /// <param name="period">the period in milli-seconds</param>
+        public RxTimerStage(IObserver<PeriodicEvent> observer, long initialDelay, long period)
+        {
+            _observer = observer;
+            _timer = new Timer(period);
+            _timer.Elapsed += (sender, e) => OnTimedEvent(sender, e, _observer, _value);
+            _timer.Enabled = true;
+        }
+
+        /// <summary>
+        /// Closes the stage
+        /// </summary>
+        public void Dispose()
+        {
+            _timer.Stop();
+        }
+
+        private static void OnTimedEvent(object source, ElapsedEventArgs e, IObserver<PeriodicEvent> observer, PeriodicEvent value)
+        {
+            observer.OnNext(value);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/RX/Impl/SimpleSubject.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/RX/Impl/SimpleSubject.cs b/lang/cs/Source/WAKE/Wake/RX/Impl/SimpleSubject.cs
new file mode 100644
index 0000000..418dc98
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/RX/Impl/SimpleSubject.cs
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ 
+using System;
+
+namespace Org.Apache.Reef.Wake.RX.Impl
+{
+    /// <summary>A Subject that relays all messages to its subscribers.</summary>
+    public class SimpleSubject<T> : ISubject<T, T>
+    {
+        private readonly IObserver<T> _observer;
+
+        /// <summary>Constructs a simple subject</summary>
+        /// <param name="observer">the observer</param>
+        public SimpleSubject(IObserver<T> observer)
+        {
+            _observer = observer;
+        }
+
+        /// <summary>Provides the observer with the new value</summary>
+        /// <param name="value">the new value</param>
+        public virtual void OnNext(T value)
+        {
+            _observer.OnNext(value);
+        }
+
+        /// <summary>Provides the observer with the error</summary>
+        /// <param name="error">the error</param>
+        public virtual void OnError(Exception error)
+        {
+            _observer.OnError(error);
+        }
+
+        /// <summary>
+        /// Provides the observer with it has finished sending push-based
+        /// notifications.
+        /// </summary>
+        public virtual void OnCompleted()
+        {
+            _observer.OnCompleted();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/RX/ObserverCompletedException.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/RX/ObserverCompletedException.cs b/lang/cs/Source/WAKE/Wake/RX/ObserverCompletedException.cs
new file mode 100644
index 0000000..b562055
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/RX/ObserverCompletedException.cs
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+
+namespace Org.Apache.Reef.Wake.RX
+{
+    /// <summary>
+    /// It is illegal to call onError() or onCompleted() when a call to onNext() is
+    /// still outstanding, or to call onNext(), onError() or onCompleted() after a
+    /// call to onError() or onCompleted() has been dispatched.
+    /// </summary>
+    /// <remarks>
+    /// It is illegal to call onError() or onCompleted() when a call to onNext() is
+    /// still outstanding, or to call onNext(), onError() or onCompleted() after a
+    /// call to onError() or onCompleted() has been dispatched. Observers may throw
+    /// an ObserverCompleted exception whenever this API is violated. Violating the
+    /// API leaves the Observer (and any resources that it holds) in an undefined
+    /// state, and throwing ObserverCompleted exceptions is optional.
+    /// Callers receiving this exception should simply pass it up the stack to the
+    /// Aura runtime. They should not attempt to forward it on to upstream or
+    /// downstream stages. The easiest way to do this is to ignore the exception
+    /// entirely.
+    /// </remarks>
+    [System.Serializable]
+    public class ObserverCompletedException : InvalidOperationException
+    {
+        private const long serialVersionUID = 1L;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/ICodec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/ICodec.cs b/lang/cs/Source/WAKE/Wake/Remote/ICodec.cs
new file mode 100644
index 0000000..25f0bce
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/ICodec.cs
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.Reef.Wake.Remote
+{
+    public interface ICodec
+    {
+    }
+
+    /// <summary>
+    /// Interface for serialization routines that translate back and forth between
+    /// byte arrays with low latency.
+    /// </summary>
+    /// <typeparam name="T">The codec type</typeparam>
+    public interface ICodec<T> : ICodec, IEncoder<T>, IDecoder<T>
+    {
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/ICodecFactory.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/ICodecFactory.cs b/lang/cs/Source/WAKE/Wake/Remote/ICodecFactory.cs
new file mode 100644
index 0000000..6ba2805
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/ICodecFactory.cs
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Tang.Annotations;
+using Org.Apache.Reef.Wake.Remote.Impl;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Org.Apache.Reef.Wake.Remote
+{
+    [DefaultImplementation(typeof(ByteCodecFactory))]
+    public interface ICodecFactory
+    {
+        object Create(); 
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/IDecoder.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/IDecoder.cs b/lang/cs/Source/WAKE/Wake/Remote/IDecoder.cs
new file mode 100644
index 0000000..ddc72c8
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/IDecoder.cs
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.Reef.Wake.Remote
+{
+    public interface IDecoder
+    {
+    }
+
+    /// <summary>
+    /// Interface for serialization routines that translate back and forth between
+    /// byte arrays with low latency.
+    /// </summary>
+    /// <typeparam name="T">The decoder type</typeparam>
+    public interface IDecoder<T> : IDecoder
+    {
+        /// <summary>Decodes the given byte array into an object</summary>
+        /// <param name="data"></param>
+        /// <returns>the decoded object</returns>
+        T Decode(byte[] data);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/IEncoder.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/IEncoder.cs b/lang/cs/Source/WAKE/Wake/Remote/IEncoder.cs
new file mode 100644
index 0000000..b2a743f
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/IEncoder.cs
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.Reef.Wake.Remote
+{
+    public interface IEncoder
+    {
+    }
+
+    /// <summary>
+    /// Interface for serialization routines that translate back and forth between
+    /// byte arrays with low latency.
+    /// </summary>
+    /// <typeparam name="T">The encoder type</typeparam>
+    public interface IEncoder<T> : IEncoder
+    {
+        /// <summary>Encodes the given object into a Byte Array</summary>
+        /// <param name="obj"></param>
+        /// <returns>a byte[] representation of the object</returns>
+        byte[] Encode(T obj);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/ILink.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/ILink.cs b/lang/cs/Source/WAKE/Wake/Remote/ILink.cs
new file mode 100644
index 0000000..b25ef4f
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/ILink.cs
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Net;
+using System.Net.Sockets;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using Org.Apache.Reef.Wake.Remote.Impl;
+
+namespace Org.Apache.Reef.Wake.Remote
+{
+    /// <summary>
+    /// Represents a link between two endpoints
+    /// </summary>
+    public interface ILink<T> : IDisposable
+    {
+        /// <summary>
+        /// Returns the local socket address
+        /// </summary>
+        IPEndPoint LocalEndpoint { get; }
+
+        /// <summary>
+        /// Returns the remote socket address
+        /// </summary>
+        IPEndPoint RemoteEndpoint { get; }
+
+        /// <summary>
+        /// Writes the value to this link asynchronously
+        /// </summary>
+        /// <param name="value">The data to write</param>
+        /// <param name="token">The cancellation token</param>
+        Task WriteAsync(T value, CancellationToken token);
+
+        /// <summary>
+        /// Writes the value to this link synchronously
+        /// </summary>
+        /// <param name="value">The data to write</param>
+        void Write(T value);
+
+        /// <summary>
+        /// Reads the value from this link asynchronously
+        /// </summary>
+        /// <returns>The read data</returns>
+        /// <param name="token">The cancellation token</param>
+        Task<T> ReadAsync(CancellationToken token);
+
+        /// <summary>
+        /// Reads the value from this link synchronously
+        /// </summary>
+        /// <returns>The read data</returns>
+        T Read();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/IRemoteEvent.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/IRemoteEvent.cs b/lang/cs/Source/WAKE/Wake/Remote/IRemoteEvent.cs
new file mode 100644
index 0000000..d693401
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/IRemoteEvent.cs
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Net;
+using System.Text;
+
+namespace Org.Apache.Reef.Wake.Remote
+{
+    public interface IRemoteEvent<T>
+    {
+        IPEndPoint LocalEndPoint { get; set; }
+
+        IPEndPoint RemoteEndPoint { get; set; }
+
+        string Source { get; }
+
+        string Sink { get; }
+
+        T Value { get; }
+
+        long Sequence { get; }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/IRemoteIdentifier.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/IRemoteIdentifier.cs b/lang/cs/Source/WAKE/Wake/Remote/IRemoteIdentifier.cs
new file mode 100644
index 0000000..1101774
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/IRemoteIdentifier.cs
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.Reef.Wake.Remote
+{
+    /// <summary>
+    /// An identifier that represents a remote source
+    /// </summary>
+    public abstract class IRemoteIdentifier : IIdentifier
+    {
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/IRemoteIdentifierFactory.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/IRemoteIdentifierFactory.cs b/lang/cs/Source/WAKE/Wake/Remote/IRemoteIdentifierFactory.cs
new file mode 100644
index 0000000..fdea1e4
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/IRemoteIdentifierFactory.cs
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.Reef.Wake.Remote
+{
+    /// <summary>Factory that creates a RemoteIdentifier</summary>
+    public interface IRemoteIdentifierFactory : IIdentifierFactory
+    {
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/IRemoteManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/IRemoteManager.cs b/lang/cs/Source/WAKE/Wake/Remote/IRemoteManager.cs
new file mode 100644
index 0000000..a572b04
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/IRemoteManager.cs
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Net;
+using Org.Apache.Reef.Wake.Remote.Impl;
+
+namespace Org.Apache.Reef.Wake.Remote
+{
+    public interface IRemoteManager<T> : IStage
+    {
+        IRemoteIdentifier Identifier { get; }
+
+        IPEndPoint LocalEndpoint { get; }
+
+        IObserver<T> GetRemoteObserver(RemoteEventEndPoint<T> dest);
+
+        IObserver<T> GetRemoteObserver(IPEndPoint remoteEndpoint);
+
+        IDisposable RegisterObserver(RemoteEventEndPoint<T> source, IObserver<T> theObserver);
+
+        IDisposable RegisterObserver(IPEndPoint remoteEndpoint, IObserver<T> theObserver);
+
+        IDisposable RegisterObserver(IObserver<IRemoteMessage<T>> theObserver);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/IRemoteMessage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/IRemoteMessage.cs b/lang/cs/Source/WAKE/Wake/Remote/IRemoteMessage.cs
new file mode 100644
index 0000000..4b3d2a3
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/IRemoteMessage.cs
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.Reef.Wake.Remote
+{
+    /// <summary>
+    /// Message received from a remote handler
+    /// </summary>
+    public interface IRemoteMessage<T>
+    {
+        /// <summary>
+        /// Returns a remote identifier of the sender
+        /// </summary>
+        /// <returns>The remote identifier</returns>
+        IRemoteIdentifier Identifier { get; }
+
+        /// <summary>
+        /// Returns an actual message
+        /// </summary>
+        /// <returns>The remote message</returns>
+        T Message { get; }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/ISubscriptionManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/ISubscriptionManager.cs b/lang/cs/Source/WAKE/Wake/Remote/ISubscriptionManager.cs
new file mode 100644
index 0000000..8d859e2
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/ISubscriptionManager.cs
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.Reef.Wake.Remote
+{
+    public interface ISubscriptionManager
+    {
+        void Unsubscribe(object token);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/Impl/ByteCodec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/ByteCodec.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/ByteCodec.cs
new file mode 100644
index 0000000..e596ab7
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/Impl/ByteCodec.cs
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Tang.Annotations;
+
+namespace Org.Apache.Reef.Wake.Remote.Impl
+{
+    public class ByteCodec : ICodec<byte[]>
+    {
+        [Inject]
+        public ByteCodec()
+        {
+        }
+
+        public byte[] Encode(byte[] obj)
+        {
+            return obj;
+        }
+
+        public byte[] Decode(byte[] data)
+        {
+            return data;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/Impl/ByteCodecFactory.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/ByteCodecFactory.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/ByteCodecFactory.cs
new file mode 100644
index 0000000..333f341
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/Impl/ByteCodecFactory.cs
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Tang.Annotations;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Org.Apache.Reef.Wake.Remote.Impl
+{
+    public class ByteCodecFactory : ICodecFactory
+    {
+        [Inject]
+        public ByteCodecFactory()
+        {
+        }
+
+        public object Create()
+        {
+            return new ByteCodec();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/Impl/Channel.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/Channel.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/Channel.cs
new file mode 100644
index 0000000..184da8a
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/Impl/Channel.cs
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.IO;
+using System.Linq;
+using System.Net.Sockets;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Org.Apache.Reef.Wake.Remote.Impl
+{
+    /// <summary>
+    /// Performs low level network IO operations between hosts
+    /// </summary>
+    public class Channel
+    {
+        private NetworkStream _stream;
+
+        /// <summary>
+        /// Constructs a new Channel with the the connected NetworkStream.
+        /// </summary>
+        /// <param name="stream">The connected stream</param>
+        public Channel(NetworkStream stream)
+        {
+            if (stream == null)
+            {
+                throw new ArgumentNullException("stream");
+            }
+
+            _stream = stream;
+        }
+
+        /// <summary>
+        /// Sends a message to the connected client synchronously
+        /// </summary>
+        /// <param name="message">The message to send</param>
+        public void Write(byte[] message)
+        {
+            if (message == null)
+            {
+                throw new ArgumentNullException("message");
+            }
+
+            byte[] messageBuffer = GenerateMessageBuffer(message);
+            _stream.Write(messageBuffer, 0, messageBuffer.Length);
+        }
+
+        /// <summary>
+        /// Sends a message to the connected client asynchronously
+        /// </summary>
+        /// <param name="message">The message to send</param>
+        /// <param name="token">The cancellation token</param>
+        /// <returns>The awaitable write task</returns>
+        public async Task WriteAsync(byte[] message, CancellationToken token)
+        {
+            byte[] messageBuffer = GenerateMessageBuffer(message);
+            await _stream.WriteAsync(messageBuffer, 0, messageBuffer.Length, token);
+        }
+
+        /// <summary>
+        /// Reads an incoming message as a byte array synchronously.
+        /// The message length is read as the first four bytes.
+        /// </summary>
+        /// <returns>The byte array message</returns>
+        public byte[] Read()
+        {
+            int payloadLength = ReadMessageLength();
+            if (payloadLength == 0)
+            {
+                return null;
+            }
+
+            return ReadBytes(payloadLength);
+        }
+
+        /// <summary>
+        /// Reads an incoming message as a byte array asynchronously.
+        /// The message length is read as the first four bytes.
+        /// </summary>
+        /// <param name="token">The cancellation token</param>
+        /// <returns>The byte array message</returns>
+        public async Task<byte[]> ReadAsync(CancellationToken token)
+        {
+            int payloadLength = await GetMessageLengthAsync(token);
+            if (payloadLength == 0)
+            {
+                return null;
+            }
+
+            return await ReadBytesAsync(payloadLength, token);
+        }
+
+        /// <summary>
+        /// Helper method to read the specified number of bytes from the network stream.
+        /// </summary>
+        /// <param name="bytesToRead">The number of bytes to read</param>
+        /// <returns>The byte[] read from the network stream with the requested 
+        /// number of bytes, otherwise null if the operation failed.
+        /// </returns>
+        private byte[] ReadBytes(int bytesToRead)
+        {
+            int totalBytesRead = 0;
+            byte[] buffer = new byte[bytesToRead];
+
+            while (totalBytesRead < bytesToRead)
+            {
+                int bytesRead = _stream.Read(buffer, totalBytesRead, bytesToRead - totalBytesRead);
+                if (bytesRead == 0)
+                {
+                    // Read timed out or connection was closed
+                    return null;
+                }
+
+                totalBytesRead += bytesRead;
+            }
+
+            return buffer;
+        }
+
+        /// <summary>
+        /// Helper method to read the specified number of bytes from the network stream.
+        /// </summary>
+        /// <param name="bytesToRead">The number of bytes to read</param>
+        /// <param name="token">The cancellation token</param>
+        /// <returns>The byte[] read from the network stream with the requested 
+        /// number of bytes, otherwise null if the operation failed.
+        /// </returns>
+        private async Task<byte[]> ReadBytesAsync(int bytesToRead, CancellationToken token)
+        {
+            int bytesRead = 0;
+            byte[] buffer = new byte[bytesToRead];
+
+            while (bytesRead < bytesToRead)
+            {
+                int amountRead = await _stream.ReadAsync(buffer, bytesRead, bytesToRead - bytesRead, token);
+                if (amountRead == 0)
+                {
+                    // Read timed out or connection was closed
+                    return null;
+                }
+
+                bytesRead += amountRead;
+            }
+
+            return buffer;
+        }
+
+        /// <summary>
+        /// Generates the payload buffer containing the message along
+        /// with a header indicating the message length.
+        /// </summary>
+        /// <param name="message">The message to send</param>
+        /// <returns>The payload buffer</returns>
+        private byte[] GenerateMessageBuffer(byte[] message)
+        {
+            byte[] lengthBuffer1 = BitConverter.GetBytes(message.Length + 4);
+            byte[] lengthBuffer2 = BitConverter.GetBytes(message.Length);
+            if (BitConverter.IsLittleEndian)
+            {
+                Array.Reverse(lengthBuffer1);
+            }
+
+            int len = lengthBuffer1.Length + lengthBuffer2.Length + message.Length;
+            byte[] messageBuffer = new byte[len];
+
+            int bytesCopied = 0;
+            bytesCopied += CopyBytes(lengthBuffer1, messageBuffer, 0);
+            bytesCopied += CopyBytes(lengthBuffer2, messageBuffer, bytesCopied);
+            CopyBytes(message, messageBuffer, bytesCopied);
+
+            return messageBuffer;
+        }
+
+        /// <summary>
+        /// Reads the first four bytes from the stream and decode
+        /// it to get the message length in bytes
+        /// </summary>
+        /// <returns>The incoming message's length in bytes</returns>
+        private int ReadMessageLength()
+        {
+            byte[] lenBytes = ReadBytes(sizeof(int));
+            if (lenBytes == null)
+            {
+                return 0;
+            }
+            if (BitConverter.IsLittleEndian)
+            {
+                Array.Reverse(lenBytes);
+            }
+            if (BitConverter.ToInt32(lenBytes, 0) == 0)
+            {
+                return 0;
+            }
+                
+            byte[] msgLength = ReadBytes(sizeof(int));
+            return (msgLength == null) ? 0 : BitConverter.ToInt32(msgLength, 0);
+        }
+
+        /// <summary>
+        /// Reads the first four bytes from the stream and decode
+        /// it to get the message length in bytes
+        /// </summary>
+        /// <param name="token">The cancellation token</param>
+        /// <returns>The incoming message's length in bytes</returns>
+        private async Task<int> GetMessageLengthAsync(CancellationToken token)
+        {
+            byte[] lenBytes = await ReadBytesAsync(sizeof(int), token);
+            if (lenBytes == null)
+            {
+                return 0;
+            }
+            if (BitConverter.IsLittleEndian)
+            {
+                Array.Reverse(lenBytes);
+            }
+            if (BitConverter.ToInt32(lenBytes, 0) == 0)
+            {
+                return 0;
+            }
+                
+            byte[] msgLength = ReadBytes(sizeof(int));
+            return (msgLength == null) ? 0 : BitConverter.ToInt32(msgLength, 0);
+        }
+
+        /// <summary>
+        /// Copies the entire source buffer into the destination buffer the specified
+        /// destination offset.
+        /// </summary>
+        /// <param name="source">The source buffer to be copied</param>
+        /// <param name="dest">The destination buffer to copy to</param>
+        /// <param name="destOffset">The offset at the destination buffer to begin
+        /// copying.</param>
+        /// <returns>The number of bytes copied</returns>
+        private int CopyBytes(byte[] source, byte[] dest, int destOffset)
+        {
+            Buffer.BlockCopy(source, 0, dest, destOffset, source.Length);
+            return source.Length;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/Impl/DefaultRemoteManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/DefaultRemoteManager.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/DefaultRemoteManager.cs
new file mode 100644
index 0000000..2bba3c8
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/Impl/DefaultRemoteManager.cs
@@ -0,0 +1,338 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Utilities.Diagnostics;
+using Org.Apache.Reef.Utilities.Logging;
+using Org.Apache.Reef.Wake.Util;
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Linq;
+using System.Net;
+using System.Net.Sockets;
+using System.Reactive;
+using System.Threading.Tasks;
+
+namespace Org.Apache.Reef.Wake.Remote.Impl
+{
+    /// <summary>
+    /// Manages incoming and outgoing messages between remote hosts.
+    /// </summary>
+    public class DefaultRemoteManager<T> : IRemoteManager<T>
+    {
+        private static readonly Logger LOGGER = Logger.GetLogger(typeof(DefaultRemoteManager<T>));
+
+        private ObserverContainer<T> _observerContainer;
+        private TransportServer<IRemoteEvent<T>> _server; 
+        private Dictionary<IPEndPoint, ProxyObserver> _cachedClients;
+        private ICodec<IRemoteEvent<T>> _codec;
+
+        /// <summary>
+        /// Constructs a DefaultRemoteManager listening on the specified address and any
+        /// available port.
+        /// </summary>
+        /// <param name="localAddress">The address to listen on</param>
+        /// <param name="codec">The codec used for serializing messages</param>
+        public DefaultRemoteManager(IPAddress localAddress, ICodec<T> codec) : this(localAddress, 0, codec)
+        {
+        }
+
+        /// <summary>
+        /// Constructs a DefaultRemoteManager listening on the specified IPEndPoint.
+        /// </summary>
+        /// <param name="localEndpoint">The endpoint to listen on</param>
+        /// <param name="codec">The codec used for serializing messages</param>
+        public DefaultRemoteManager(IPEndPoint localEndpoint, ICodec<T> codec)
+        {
+            if (localEndpoint == null)
+            {
+                throw new ArgumentNullException("localEndpoint");
+            }
+            if (localEndpoint.Port < 0)
+            {
+                throw new ArgumentException("Listening port must be greater than or equal to zero");
+            }
+            if (codec == null)
+            {
+                throw new ArgumentNullException("codec");
+            }
+
+            _codec = new RemoteEventCodec<T>(codec);
+            _observerContainer = new ObserverContainer<T>();
+            _cachedClients = new Dictionary<IPEndPoint, ProxyObserver>();
+
+            // Begin to listen for incoming messages
+            _server = new TransportServer<IRemoteEvent<T>>(localEndpoint, _observerContainer, _codec);
+            _server.Run();
+
+            LocalEndpoint = _server.LocalEndpoint;
+            Identifier = new SocketRemoteIdentifier(LocalEndpoint);
+        }
+
+        /// <summary>
+        /// Constructs a DefaultRemoteManager listening on the specified address and any
+        /// available port.
+        /// </summary>
+        /// <param name="localAddress">The address to listen on</param>
+        /// <param name="port">The port to listen on</param>
+        /// <param name="codec">The codec used for serializing messages</param>
+        public DefaultRemoteManager(IPAddress localAddress, int port, ICodec<T> codec)
+        {
+            if (localAddress == null)
+            {
+                throw new ArgumentNullException("localAddress");
+            }
+            if (port < 0)
+            {
+                throw new ArgumentException("Listening port must be greater than or equal to zero");
+            }
+            if (codec == null)
+            {
+                throw new ArgumentNullException("codec");
+            }
+
+            _observerContainer = new ObserverContainer<T>();
+            _codec = new RemoteEventCodec<T>(codec);
+            _cachedClients = new Dictionary<IPEndPoint, ProxyObserver>();
+
+            IPEndPoint localEndpoint = new IPEndPoint(localAddress, port);
+
+            // Begin to listen for incoming messages
+            _server = new TransportServer<IRemoteEvent<T>>(localEndpoint, _observerContainer, _codec);
+            _server.Run();
+
+            LocalEndpoint = _server.LocalEndpoint;
+            Identifier = new SocketRemoteIdentifier(LocalEndpoint);
+        }
+
+        /// <summary>
+        /// Constructs a DefaultRemoteManager. Does not listen for incoming messages.
+        /// </summary>
+        /// <param name="codec">The codec used for serializing messages</param>
+        public DefaultRemoteManager(ICodec<T> codec)
+        {
+            using (LOGGER.LogFunction("DefaultRemoteManager::DefaultRemoteManager"))
+            {
+                if (codec == null)
+                {
+                    throw new ArgumentNullException("codec");
+                }
+
+                _observerContainer = new ObserverContainer<T>();
+                _codec = new RemoteEventCodec<T>(codec);
+                _cachedClients = new Dictionary<IPEndPoint, ProxyObserver>();
+
+                LocalEndpoint = new IPEndPoint(NetworkUtils.LocalIPAddress, 0);
+                Identifier = new SocketRemoteIdentifier(LocalEndpoint);
+            }
+        }
+
+        /// <summary>
+        /// Gets the RemoteIdentifier for the DefaultRemoteManager
+        /// </summary>
+        public IRemoteIdentifier Identifier { get; private set; }
+
+        /// <summary>
+        /// Gets the local IPEndPoint for the DefaultRemoteManager
+        /// </summary>
+        public IPEndPoint LocalEndpoint { get; private set; }
+
+        /// <summary>
+        /// Returns an IObserver used to send messages to the remote host at
+        /// the specified IPEndpoint.
+        /// </summary>
+        /// <param name="remoteEndpoint">The IPEndpoint of the remote host</param>
+        /// <returns>An IObserver used to send messages to the remote host</returns>
+        public IObserver<T> GetRemoteObserver(RemoteEventEndPoint<T> remoteEndpoint)
+        {
+            if (remoteEndpoint == null)
+            {
+                throw new ArgumentNullException("remoteEndpoint");
+            }
+
+            SocketRemoteIdentifier id = remoteEndpoint.Id as SocketRemoteIdentifier;
+            if (id == null)
+            {
+                throw new ArgumentException("ID not supported");
+            }
+
+            return GetRemoteObserver(id.Addr);
+        }
+
+        /// <summary>
+        /// Returns an IObserver used to send messages to the remote host at
+        /// the specified IPEndpoint.
+        /// </summary>
+        /// <param name="remoteEndpoint">The IPEndpoint of the remote host</param>
+        /// <returns>An IObserver used to send messages to the remote host</returns>
+        public IObserver<T> GetRemoteObserver(IPEndPoint remoteEndpoint)
+        {
+            if (remoteEndpoint == null)
+            {
+                throw new ArgumentNullException("remoteEndpoint");
+            }
+
+            ProxyObserver remoteObserver;
+            if (!_cachedClients.TryGetValue(remoteEndpoint, out remoteObserver))
+            {
+                TransportClient<IRemoteEvent<T>> client = 
+                    new TransportClient<IRemoteEvent<T>>(remoteEndpoint, _codec, _observerContainer);
+
+                remoteObserver = new ProxyObserver(client);
+                _cachedClients[remoteEndpoint] = remoteObserver;
+            }
+
+            return remoteObserver;
+        }
+
+        /// <summary>
+        /// Registers an IObserver used to handle incoming messages from the remote host
+        /// at the specified IPEndPoint.
+        /// The IDisposable that is returned can be used to unregister the IObserver.
+        /// </summary>
+        /// <param name="remoteEndpoint">The IPEndPoint of the remote host</param>
+        /// <param name="observer">The IObserver to handle incoming messages</param>
+        /// <returns>An IDisposable used to unregister the observer with</returns>
+        public IDisposable RegisterObserver(RemoteEventEndPoint<T> remoteEndpoint, IObserver<T> observer)
+        {
+            if (remoteEndpoint == null)
+            {
+                throw new ArgumentNullException("remoteEndpoint");
+            }
+
+            SocketRemoteIdentifier id = remoteEndpoint.Id as SocketRemoteIdentifier;
+            if (id == null)
+            {
+                throw new ArgumentException("ID not supported");
+            }
+
+            return RegisterObserver(id.Addr, observer);
+        }
+
+        /// <summary>
+        /// Registers an IObserver used to handle incoming messages from the remote host
+        /// at the specified IPEndPoint.
+        /// The IDisposable that is returned can be used to unregister the IObserver.
+        /// </summary>
+        /// <param name="remoteEndpoint">The IPEndPoint of the remote host</param>
+        /// <param name="observer">The IObserver to handle incoming messages</param>
+        /// <returns>An IDisposable used to unregister the observer with</returns>
+        public IDisposable RegisterObserver(IPEndPoint remoteEndpoint, IObserver<T> observer)
+        {
+            if (remoteEndpoint == null)
+            {
+                throw new ArgumentNullException("remoteEndpoint");
+            }
+            if (observer == null)
+            {
+                throw new ArgumentNullException("observer");
+            }
+
+            return _observerContainer.RegisterObserver(remoteEndpoint, observer);
+        }
+
+        /// <summary>
+        /// Registers an IObserver used to handle incoming messages from the remote host
+        /// at the specified IPEndPoint.
+        /// The IDisposable that is returned can be used to unregister the IObserver.
+        /// </summary>
+        /// <param name="observer">The IObserver to handle incoming messages</param>
+        /// <returns>An IDisposable used to unregister the observer with</returns>
+        public IDisposable RegisterObserver(IObserver<IRemoteMessage<T>> observer)
+        {
+            if (observer == null)
+            {
+                throw new ArgumentNullException("observer");
+            }
+
+            return _observerContainer.RegisterObserver(observer);
+        }
+
+        /// <summary>
+        /// Release all resources for the DefaultRemoteManager.
+        /// </summary>
+        public void Dispose()
+        {
+            foreach (ProxyObserver cachedClient in _cachedClients.Values)
+            {
+                cachedClient.Dispose();
+            }
+
+            if (_server != null)
+            {
+                _server.Dispose();
+            }
+        }
+
+        /// <summary>
+        /// Observer to send messages to connected remote host
+        /// </summary>
+        private class ProxyObserver : IObserver<T>, IDisposable
+        {
+            private TransportClient<IRemoteEvent<T>> _client;
+            private int _messageCount;
+
+            /// <summary>
+            /// Create new ProxyObserver
+            /// </summary>
+            /// <param name="client">The connected transport client used to send
+            /// messages to remote host</param>
+            public ProxyObserver(TransportClient<IRemoteEvent<T>> client)
+            {
+                _client = client;
+                _messageCount = 0;
+            }
+
+            /// <summary>
+            /// Send the message to the remote host
+            /// </summary>
+            /// <param name="message">The message to send</param>
+            public void OnNext(T message)
+            {
+                IRemoteEvent<T> remoteEvent = new RemoteEvent<T>(_client.Link.LocalEndpoint, _client.Link.RemoteEndpoint, message)
+                {
+                    Sink = "default",
+                    Sequence = _messageCount
+                };
+
+                _messageCount++;
+                _client.Send(remoteEvent);
+            }
+
+            /// <summary>
+            /// Close underlying transport client
+            /// </summary>
+            public void Dispose()
+            {
+                _client.Dispose();
+            }
+
+            public void OnError(Exception error)
+            {
+                throw new NotImplementedException();
+            }
+
+            public void OnCompleted()
+            {
+                throw new NotImplementedException();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/Impl/DefaultRemoteMessage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/DefaultRemoteMessage.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/DefaultRemoteMessage.cs
new file mode 100644
index 0000000..5b24276
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/Impl/DefaultRemoteMessage.cs
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.Reef.Wake.Remote.Impl
+{
+    class DefaultRemoteMessage<T> : IRemoteMessage<T>
+    {
+        public DefaultRemoteMessage(IRemoteIdentifier id, T message)
+        {
+            Identifier = id;
+            Message = message;
+        }
+
+        public IRemoteIdentifier Identifier { get; private set; }
+
+        public T Message { get; private set; }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/Impl/IPEndpointComparer.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/IPEndpointComparer.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/IPEndpointComparer.cs
new file mode 100644
index 0000000..8d4b47d
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/Impl/IPEndpointComparer.cs
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Net;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Org.Apache.Reef.Wake.Remote.Impl
+{
+    /// <summary>
+    /// Class to compare two IPEndPoint objects.
+    /// </summary>
+    internal class IPEndPointComparer : IEqualityComparer<IPEndPoint>
+    {
+        public bool Equals(IPEndPoint x, IPEndPoint y)
+        {
+            if (ReferenceEquals(x, y))
+            {
+                return true;
+            }
+            if (x == null || y == null)
+            {
+                return false;
+            }
+
+            // If either port is 0, don't check port
+            if (x.Port == 0 || y.Port == 0)
+            {
+                return x.Address.Equals(y.Address);
+            }
+
+            return x.Equals(y);
+        }
+
+        public int GetHashCode(IPEndPoint obj)
+        {
+            return obj.Address.GetHashCode();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/Impl/IntCodec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/IntCodec.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/IntCodec.cs
new file mode 100644
index 0000000..e413023
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/Impl/IntCodec.cs
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Tang.Annotations;
+using System;
+
+namespace Org.Apache.Reef.Wake.Remote.Impl
+{
+    public class IntCodec : ICodec<int>
+    {
+        [Inject]
+        public IntCodec()
+        {
+        }
+
+        public byte[] Encode(int obj)
+        {
+            return BitConverter.GetBytes(obj);
+        }
+
+        public int Decode(byte[] data)
+        {
+            return BitConverter.ToInt32(data, 0);
+        }
+    }
+}