You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/05/06 02:14:00 UTC

incubator-reef git commit: [REEF-258] Improve RemoteManager memory efficiency by using Writables

Repository: incubator-reef
Updated Branches:
  refs/heads/master 31d41f886 -> a8ebe24e5


[REEF-258] Improve RemoteManager memory efficiency by using Writables

This change adds the `WritableRemoteManager` implementation of `IRemoteManager`.
It is more memory efficient as it only creates one serialized copy of each
message instead of requiring it to be pre-serialized into a `byte[]`.

JIRA:
  [REEF-258](https://issues.apache.org/jira/browse/REEF-258)

Pull Request:
  This closes #170


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/a8ebe24e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/a8ebe24e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/a8ebe24e

Branch: refs/heads/master
Commit: a8ebe24e5e626f916dfb6276184284725228361d
Parents: 31d41f8
Author: dkm2110 <dh...@gmail.com>
Authored: Fri May 1 15:49:10 2015 -0700
Committer: Markus Weimer <we...@apache.org>
Committed: Tue May 5 17:09:56 2015 -0700

----------------------------------------------------------------------
 .../Org.Apache.REEF.Wake.Tests.csproj           |   1 +
 .../WritableRemoteManagerTest.cs                | 375 +++++++++++++++++++
 .../Impl/DefaultRemoteManagerFactory.cs         |  53 ---
 .../Org.Apache.REEF.Wake.csproj                 |   7 +-
 .../Remote/IWritableRemoteEvent.cs              |  45 +++
 .../Remote/Impl/DefaultRemoteManagerFactory.cs  |  53 +++
 .../Remote/Impl/WritableObserverContainer.cs    | 132 +++++++
 .../Remote/Impl/WritableRemoteEvent.cs          | 109 ++++++
 .../Remote/Impl/WritableRemoteManager.cs        | 279 ++++++++++++++
 .../Remote/Impl/WritableRemoteManagerFactory.cs |  55 +++
 10 files changed, 1055 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/a8ebe24e/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj b/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj
index d709933..f947422 100644
--- a/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj
@@ -49,6 +49,7 @@ under the License.
     <Compile Include="Properties\AssemblyInfo.cs" />
     <Compile Include="PubSubSubjectTest.cs" />
     <Compile Include="RemoteManagerTest.cs" />
+    <Compile Include="WritableRemoteManagerTest.cs" />
     <Compile Include="WritableTransportTest.cs" />
     <Compile Include="TransportTest.cs" />
     <Compile Include="WritableString.cs" />

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/a8ebe24e/lang/cs/Org.Apache.REEF.Wake.Tests/WritableRemoteManagerTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/WritableRemoteManagerTest.cs b/lang/cs/Org.Apache.REEF.Wake.Tests/WritableRemoteManagerTest.cs
new file mode 100644
index 0000000..6f7baf9
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/WritableRemoteManagerTest.cs
@@ -0,0 +1,375 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.IO;
+using System.Net;
+using System.Reactive;
+using System.Threading.Tasks;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Wake.Impl;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Wake.Remote.Impl;
+using Org.Apache.REEF.Wake.Util;
+
+namespace Org.Apache.REEF.Wake.Tests
+{
+    [TestClass]
+    [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)]
+    public class WritableRemoteManagerTest
+    {
+        private readonly WritableRemoteManagerFactory _remoteManagerFactory =
+            TangFactory.GetTang().NewInjector().GetInstance<WritableRemoteManagerFactory>();
+        
+        /// <summary>
+        /// Tests one way communication between Remote Managers 
+        /// Remote Manager listens on any available port
+        /// </summary>
+        [TestMethod]
+        public void TestWritableOneWayCommunication()
+        {
+            IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
+
+            BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>();
+            List<string> events = new List<string>();
+
+            using (var remoteManager1 = _remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0))
+            using (var remoteManager2 = _remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0))
+            {
+                var observer = Observer.Create<WritableString>(queue.Add);
+                IPEndPoint endpoint1 = new IPEndPoint(listeningAddress, 0);
+                remoteManager2.RegisterObserver(endpoint1, observer);
+
+                var remoteObserver = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
+                remoteObserver.OnNext(new WritableString("abc"));
+                remoteObserver.OnNext(new WritableString("def"));
+                remoteObserver.OnNext(new WritableString("ghi"));
+
+                events.Add(queue.Take().Data);
+                events.Add(queue.Take().Data);
+                events.Add(queue.Take().Data);
+            }
+
+            Assert.AreEqual(3, events.Count);
+        }
+
+        /// <summary>
+        /// Tests one way communication between Remote Managers 
+        /// Remote manager listens on a particular port
+        /// </summary>
+        [TestMethod]
+        public void TestWritableOneWayCommunicationClientOnly()
+        {
+            int listeningPort = NetworkUtils.GenerateRandomPort(6000, 7000);
+            IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
+
+            BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>();
+            List<string> events = new List<string>();
+
+            using (var remoteManager1 = _remoteManagerFactory.GetInstance<WritableString>())
+            using (var remoteManager2 = _remoteManagerFactory.GetInstance<WritableString>(listeningAddress, listeningPort))
+            {
+                IPEndPoint remoteEndpoint = new IPEndPoint(listeningAddress, 0);
+                var observer = Observer.Create<WritableString>(queue.Add);
+                remoteManager2.RegisterObserver(remoteEndpoint, observer);
+
+                var remoteObserver = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
+                remoteObserver.OnNext(new WritableString("abc"));
+                remoteObserver.OnNext(new WritableString("def"));
+                remoteObserver.OnNext(new WritableString("ghi"));
+
+                events.Add(queue.Take().Data);
+                events.Add(queue.Take().Data);
+                events.Add(queue.Take().Data);
+            }
+
+            Assert.AreEqual(3, events.Count);
+        }
+
+        /// <summary>
+        /// Tests two way communications. Checks whether both sides are able to receive messages
+        /// </summary>
+        [TestMethod]
+        public void TestWritableTwoWayCommunication()
+        {
+            IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
+
+            BlockingCollection<WritableString> queue1 = new BlockingCollection<WritableString>();
+            BlockingCollection<WritableString> queue2 = new BlockingCollection<WritableString>();
+            List<string> events1 = new List<string>();
+            List<string> events2 = new List<string>();
+
+            using (var remoteManager1 = _remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0))
+            using (var remoteManager2 = _remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0))
+            {
+                // Register observers for remote manager 1 and remote manager 2
+                var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
+                var observer1 = Observer.Create<WritableString>(queue1.Add);
+                var observer2 = Observer.Create<WritableString>(queue2.Add);
+                remoteManager1.RegisterObserver(remoteEndpoint, observer1);
+                remoteManager2.RegisterObserver(remoteEndpoint, observer2);
+
+                // Remote manager 1 sends 3 events to remote manager 2
+                var remoteObserver1 = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
+                remoteObserver1.OnNext(new WritableString("abc"));
+                remoteObserver1.OnNext(new WritableString("def"));
+                remoteObserver1.OnNext(new WritableString("ghi"));
+
+                // Remote manager 2 sends 4 events to remote manager 1
+                var remoteObserver2 = remoteManager2.GetRemoteObserver(remoteManager1.LocalEndpoint);
+                remoteObserver2.OnNext(new WritableString("jkl"));
+                remoteObserver2.OnNext(new WritableString("mno"));
+                remoteObserver2.OnNext(new WritableString("pqr"));
+                remoteObserver2.OnNext(new WritableString("stu"));
+
+                events1.Add(queue1.Take().Data);
+                events1.Add(queue1.Take().Data);
+                events1.Add(queue1.Take().Data);
+                events1.Add(queue1.Take().Data);
+
+                events2.Add(queue2.Take().Data);
+                events2.Add(queue2.Take().Data);
+                events2.Add(queue2.Take().Data);
+            }
+
+            Assert.AreEqual(4, events1.Count);
+            Assert.AreEqual(3, events2.Count);
+        }
+
+        /// <summary>
+        /// Tests one way communication between 3 nodes.
+        /// nodes 1 and 2 send messages to node 3
+        /// </summary>
+        [TestMethod]
+        public void TestWritableCommunicationThreeNodesOneWay()
+        {
+            IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
+
+            BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>();
+            List<string> events = new List<string>();
+
+            using (var remoteManager1 = _remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0))
+            using (var remoteManager2 = _remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0))
+            using (var remoteManager3 = _remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0))
+            {
+                var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
+                var observer = Observer.Create<WritableString>(queue.Add);
+                remoteManager3.RegisterObserver(remoteEndpoint, observer);
+
+                var remoteObserver1 = remoteManager1.GetRemoteObserver(remoteManager3.LocalEndpoint);
+                var remoteObserver2 = remoteManager2.GetRemoteObserver(remoteManager3.LocalEndpoint);
+
+                remoteObserver2.OnNext(new WritableString("abc"));
+                remoteObserver1.OnNext(new WritableString("def"));
+                remoteObserver2.OnNext(new WritableString("ghi"));
+                remoteObserver1.OnNext(new WritableString("jkl"));
+                remoteObserver2.OnNext(new WritableString("mno"));
+
+                for (int i = 0; i < 5; i++)
+                {
+                    events.Add(queue.Take().Data);
+                }
+            }
+
+            Assert.AreEqual(5, events.Count);
+        }
+
+        /// <summary>
+        /// Tests one way communication between 3 nodes.
+        /// nodes 1 and 2 send messages to node 3 and node 3 sends message back
+        /// </summary>
+        [TestMethod]
+        public void TestWritableCommunicationThreeNodesBothWays()
+        {
+            IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
+
+            BlockingCollection<WritableString> queue1 = new BlockingCollection<WritableString>();
+            BlockingCollection<WritableString> queue2 = new BlockingCollection<WritableString>();
+            BlockingCollection<WritableString> queue3 = new BlockingCollection<WritableString>();
+            List<string> events1 = new List<string>();
+            List<string> events2 = new List<string>();
+            List<string> events3 = new List<string>();
+
+            using (var remoteManager1 = _remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0))
+            using (var remoteManager2 = _remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0))
+            using (var remoteManager3 = _remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0))
+            {
+                var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
+
+                var observer = Observer.Create<WritableString>(queue1.Add);
+                remoteManager1.RegisterObserver(remoteEndpoint, observer);
+                var observer2 = Observer.Create<WritableString>(queue2.Add);
+                remoteManager2.RegisterObserver(remoteEndpoint, observer2);
+                var observer3 = Observer.Create<WritableString>(queue3.Add);
+                remoteManager3.RegisterObserver(remoteEndpoint, observer3);
+
+                var remoteObserver1 = remoteManager1.GetRemoteObserver(remoteManager3.LocalEndpoint);
+                var remoteObserver2 = remoteManager2.GetRemoteObserver(remoteManager3.LocalEndpoint);
+
+                // Observer 1 and 2 send messages to observer 3
+                remoteObserver1.OnNext(new WritableString("abc"));
+                remoteObserver1.OnNext(new WritableString("abc"));
+                remoteObserver1.OnNext(new WritableString("abc"));
+                remoteObserver2.OnNext(new WritableString("def"));
+                remoteObserver2.OnNext(new WritableString("def"));
+
+                // Observer 3 sends messages back to observers 1 and 2
+                var remoteObserver3A = remoteManager3.GetRemoteObserver(remoteManager1.LocalEndpoint);
+                var remoteObserver3B = remoteManager3.GetRemoteObserver(remoteManager2.LocalEndpoint);
+
+                remoteObserver3A.OnNext(new WritableString("ghi"));
+                remoteObserver3A.OnNext(new WritableString("ghi"));
+                remoteObserver3B.OnNext(new WritableString("jkl"));
+                remoteObserver3B.OnNext(new WritableString("jkl"));
+                remoteObserver3B.OnNext(new WritableString("jkl"));
+
+                events1.Add(queue1.Take().Data);
+                events1.Add(queue1.Take().Data);
+
+                events2.Add(queue2.Take().Data);
+                events2.Add(queue2.Take().Data);
+                events2.Add(queue2.Take().Data);
+
+                events3.Add(queue3.Take().Data);
+                events3.Add(queue3.Take().Data);
+                events3.Add(queue3.Take().Data);
+                events3.Add(queue3.Take().Data);
+                events3.Add(queue3.Take().Data);
+            }
+
+            Assert.AreEqual(2, events1.Count);
+            Assert.AreEqual(3, events2.Count);
+            Assert.AreEqual(5, events3.Count);
+        }
+
+        /// <summary>
+        /// Tests whether remote manager is able to send acknowledgement back
+        /// </summary>
+        [TestMethod]
+        public void TestWritableRemoteSenderCallback()
+        {
+            IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
+
+            BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>();
+            List<string> events = new List<string>();
+
+            using (var remoteManager1 = _remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0))
+            using (var remoteManager2 = _remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0))
+            {
+                // Register handler for when remote manager 2 receives events; respond
+                // with an ack
+                var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
+                var remoteObserver2 = remoteManager2.GetRemoteObserver(remoteManager1.LocalEndpoint);
+
+                var receiverObserver = Observer.Create<WritableString>(
+                    message => remoteObserver2.OnNext(new WritableString("received message: " + message.Data)));
+                remoteManager2.RegisterObserver(remoteEndpoint, receiverObserver);
+
+                // Register handler for remote manager 1 to record the ack
+                var senderObserver = Observer.Create<WritableString>(queue.Add);
+                remoteManager1.RegisterObserver(remoteEndpoint, senderObserver);
+
+                // Begin to send messages
+                var remoteObserver1 = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
+                remoteObserver1.OnNext(new WritableString("hello"));
+                remoteObserver1.OnNext(new WritableString("there"));
+                remoteObserver1.OnNext(new WritableString("buddy"));
+
+                events.Add(queue.Take().Data);
+                events.Add(queue.Take().Data);
+                events.Add(queue.Take().Data);
+            }
+
+            Assert.AreEqual(3, events.Count);
+            Assert.AreEqual("received message: hello", events[0]);
+            Assert.AreEqual("received message: there", events[1]);
+            Assert.AreEqual("received message: buddy", events[2]);
+        }
+        
+        /// <summary>
+        /// Test whether observer can be created with IRemoteMessage interface
+        /// </summary>
+        [TestMethod]
+        public void TestWritableRegisterObserverByType()
+        {
+            IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
+
+            BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>();
+            List<string> events = new List<string>();
+
+            using (var remoteManager1 = _remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0))
+            using (var remoteManager2 = _remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0))
+            {
+                // RemoteManager2 listens and records events of type IRemoteEvent<WritableString>
+                var observer = Observer.Create<IRemoteMessage<WritableString>>(message => queue.Add(message.Message));
+                remoteManager2.RegisterObserver(observer);
+
+                // Remote manager 1 sends 3 events to remote manager 2
+                var remoteObserver = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
+                remoteObserver.OnNext(new WritableString("abc"));
+                remoteObserver.OnNext(new WritableString("def"));
+                remoteObserver.OnNext(new WritableString("ghi"));
+
+                events.Add(queue.Take().Data);
+                events.Add(queue.Take().Data);
+                events.Add(queue.Take().Data);
+            }
+
+            Assert.AreEqual(3, events.Count);
+        }
+
+        /// <summary>
+        /// Tests whether we get the cached observer back for sending message without reinstantiating it
+        /// </summary>
+        [TestMethod]
+        public void TestWritableCachedConnection()
+        {
+            IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
+
+            BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>();
+            List<string> events = new List<string>();
+
+            using (var remoteManager1 = _remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0))
+            using (var remoteManager2 = _remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0))
+            {
+                var observer = Observer.Create<WritableString>(queue.Add);
+                IPEndPoint endpoint1 = new IPEndPoint(listeningAddress, 0);
+                remoteManager2.RegisterObserver(endpoint1, observer);
+
+                var remoteObserver = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
+                remoteObserver.OnNext(new WritableString("abc"));
+                remoteObserver.OnNext(new WritableString("def"));
+
+                var cachedObserver = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
+                cachedObserver.OnNext(new WritableString("ghi"));
+                cachedObserver.OnNext(new WritableString("jkl"));
+
+                events.Add(queue.Take().Data);
+                events.Add(queue.Take().Data);
+                events.Add(queue.Take().Data);
+                events.Add(queue.Take().Data);
+            }
+
+            Assert.AreEqual(4, events.Count);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/a8ebe24e/lang/cs/Org.Apache.REEF.Wake/Impl/DefaultRemoteManagerFactory.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Impl/DefaultRemoteManagerFactory.cs b/lang/cs/Org.Apache.REEF.Wake/Impl/DefaultRemoteManagerFactory.cs
deleted file mode 100644
index 38a020f..0000000
--- a/lang/cs/Org.Apache.REEF.Wake/Impl/DefaultRemoteManagerFactory.cs
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System.Net;
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Wake.Remote;
-using Org.Apache.REEF.Wake.Remote.Impl;
-
-namespace Org.Apache.REEF.Wake.Impl
-{
-    /// <summary>
-    /// An implementation of IRemoteManagerFactory for DefaultRemoteManager.
-    /// </summary>
-    internal sealed class DefaultRemoteManagerFactory : IRemoteManagerFactory
-    {
-        [Inject]
-        private DefaultRemoteManagerFactory()
-        {
-        }
-
-        public IRemoteManager<T> GetInstance<T>(IPAddress localAddress, int port, ICodec<T> codec)
-        {
-#pragma warning disable 618
-            // This is the one place allowed to call this constructor. Hence, disabling the warning is OK.
-            return new DefaultRemoteManager<T>(localAddress, port, codec);
-#pragma warning restore 618
-        }
-
-        public IRemoteManager<T> GetInstance<T>(ICodec<T> codec)
-        {
-#pragma warning disable 618
-            // This is the one place allowed to call this constructor. Hence, disabling the warning is OK.
-            return new DefaultRemoteManager<T>(codec);
-#pragma warning restore 618
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/a8ebe24e/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj b/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
index 26f2fbf..a62d524 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
+++ b/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
@@ -48,7 +48,8 @@ under the License.
     <Compile Include="IEventHandler.cs" />
     <Compile Include="IIdentifier.cs" />
     <Compile Include="IIdentifierFactory.cs" />
-    <Compile Include="Impl\DefaultRemoteManagerFactory.cs" />
+    <Compile Include="Remote\Impl\WritableRemoteManagerFactory.cs" />
+    <Compile Include="Remote\Impl\DefaultRemoteManagerFactory.cs" />
     <Compile Include="Impl\LoggingEventHandler.cs" />
     <Compile Include="Impl\MissingStartHandlerHandler.cs" />
     <Compile Include="Impl\MultiEventHandler.cs" />
@@ -65,11 +66,15 @@ under the License.
     <Compile Include="Remote\IDataWriter.cs" />
     <Compile Include="Remote\Impl\StreamDataReader.cs" />
     <Compile Include="Remote\Impl\StreamDataWriter.cs" />
+    <Compile Include="Remote\Impl\WritableRemoteManager.cs" />
     <Compile Include="Remote\Impl\WritableLink.cs" />
+    <Compile Include="Remote\Impl\WritableObserverContainer.cs" />
+    <Compile Include="Remote\Impl\WritableRemoteEvent.cs" />
     <Compile Include="Remote\Impl\WritableTransportClient.cs" />
     <Compile Include="Remote\Impl\WritableTransportServer.cs" />
     <Compile Include="Remote\IRemoteManagerFactory.cs" />
     <Compile Include="Remote\IWritable.cs" />
+    <Compile Include="Remote\IWritableRemoteEvent.cs" />
     <Compile Include="Remote\Proto\WakeRemoteProtosGen.cs" />
     <Compile Include="Remote\ICodec.cs" />
     <Compile Include="Remote\ICodecFactory.cs" />

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/a8ebe24e/lang/cs/Org.Apache.REEF.Wake/Remote/IWritableRemoteEvent.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/IWritableRemoteEvent.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/IWritableRemoteEvent.cs
new file mode 100644
index 0000000..40222aa
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/IWritableRemoteEvent.cs
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Linq.Expressions;
+using System.Net;
+
+namespace Org.Apache.REEF.Wake.Remote
+{
+    /// <summary>
+    /// Interface for remote event
+    /// </summary>
+    /// <typeparam name="T">Type of remote event message. It is assumed that T implements IWritable</typeparam>
+    [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)]
+    internal interface IWritableRemoteEvent<T> : IWritable where T : IWritable
+    {
+        /// <summary>
+        /// Local Endpoint
+        /// </summary>
+        IPEndPoint LocalEndPoint { get; set; }
+
+        /// <summary>
+        /// Remote Endpoint
+        /// </summary>
+        IPEndPoint RemoteEndPoint { get; set; }
+
+        T Value { get; }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/a8ebe24e/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManagerFactory.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManagerFactory.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManagerFactory.cs
new file mode 100644
index 0000000..38a020f
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManagerFactory.cs
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Net;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Wake.Remote.Impl;
+
+namespace Org.Apache.REEF.Wake.Impl
+{
+    /// <summary>
+    /// An implementation of IRemoteManagerFactory for DefaultRemoteManager.
+    /// </summary>
+    internal sealed class DefaultRemoteManagerFactory : IRemoteManagerFactory
+    {
+        [Inject]
+        private DefaultRemoteManagerFactory()
+        {
+        }
+
+        public IRemoteManager<T> GetInstance<T>(IPAddress localAddress, int port, ICodec<T> codec)
+        {
+#pragma warning disable 618
+            // This is the one place allowed to call this constructor. Hence, disabling the warning is OK.
+            return new DefaultRemoteManager<T>(localAddress, port, codec);
+#pragma warning restore 618
+        }
+
+        public IRemoteManager<T> GetInstance<T>(ICodec<T> codec)
+        {
+#pragma warning disable 618
+            // This is the one place allowed to call this constructor. Hence, disabling the warning is OK.
+            return new DefaultRemoteManager<T>(codec);
+#pragma warning restore 618
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/a8ebe24e/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableObserverContainer.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableObserverContainer.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableObserverContainer.cs
new file mode 100644
index 0000000..9790e3e
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableObserverContainer.cs
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Concurrent;
+using System.Net;
+using Org.Apache.REEF.Wake.Util;
+using Org.Apache.REEF.Utilities.Diagnostics;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Wake.Remote.Impl
+{
+    /// <summary>
+    /// Stores registered IObservers for DefaultRemoteManager.
+    /// Can register and look up IObservers by remote IPEndPoint.
+    /// </summary>
+    /// <typeparam name="T">Message type T. It is assumed to be IWritable</typeparam>
+    [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)]
+    internal class WritableObserverContainer<T> : IObserver<TransportEvent<IWritableRemoteEvent<T>>> where T : IWritable
+    {
+        private static readonly Logger Logger = Logger.GetLogger(typeof(WritableObserverContainer<>));
+        private readonly ConcurrentDictionary<IPEndPoint, IObserver<T>> _endpointMap;
+        private readonly ConcurrentDictionary<Type, IObserver<IRemoteMessage<T>>> _typeMap;
+        private IObserver<T> _universalObserver;
+
+        /// <summary>
+        /// Constructs a new ObserverContainer used to manage remote IObservers.
+        /// </summary>
+        public WritableObserverContainer()
+        {
+            _endpointMap = new ConcurrentDictionary<IPEndPoint, IObserver<T>>(new IPEndPointComparer());
+            _typeMap = new ConcurrentDictionary<Type, IObserver<IRemoteMessage<T>>>();
+        }
+
+        /// <summary>
+        /// Registers an IObserver used to handle incoming messages from the remote host
+        /// at the specified IPEndPoint.
+        /// </summary>
+        /// <param name="remoteEndpoint">The IPEndPoint of the remote host</param>
+        /// <param name="observer">The IObserver to handle incoming messages</param>
+        /// <returns>An IDisposable used to unregister the observer with</returns>
+        public IDisposable RegisterObserver(IPEndPoint remoteEndpoint, IObserver<T> observer) 
+        {
+            if (remoteEndpoint.Address.Equals(IPAddress.Any))
+            {
+                _universalObserver = observer;
+                return Disposable.Create(() => { _universalObserver = null; });
+            }
+
+            _endpointMap[remoteEndpoint] = observer;
+            return Disposable.Create(() => _endpointMap.TryRemove(remoteEndpoint, out observer));
+        }
+
+        /// <summary>
+        /// Registers an IObserver to handle incoming messages from a remote host
+        /// </summary>
+        /// <param name="observer">The IObserver to handle incoming messages</param>
+        /// <returns>An IDisposable used to unregister the observer with</returns>
+        public IDisposable RegisterObserver(IObserver<IRemoteMessage<T>> observer)
+        {
+            _typeMap[typeof(T)] = observer;
+            return Disposable.Create(() => _typeMap.TryRemove(typeof(T), out observer));
+        }
+
+        /// <summary>
+        /// Look up the IObserver for the registered IPEndPoint or event type 
+        /// and execute the IObserver.
+        /// </summary>
+        /// <param name="transportEvent">The incoming remote event</param>
+        public void OnNext(TransportEvent<IWritableRemoteEvent<T>> transportEvent)
+        {
+            IWritableRemoteEvent<T> remoteEvent = transportEvent.Data;
+            remoteEvent.LocalEndPoint = transportEvent.Link.LocalEndpoint;
+            remoteEvent.RemoteEndPoint = transportEvent.Link.RemoteEndpoint;
+            T value = remoteEvent.Value;
+            bool handled = false;
+
+            IObserver<T> observer1;
+            IObserver<IRemoteMessage<T>> observer2;
+            if (_universalObserver != null)
+            {
+                _universalObserver.OnNext(value);
+                handled = true;
+            }
+            if (_endpointMap.TryGetValue(remoteEvent.RemoteEndPoint, out observer1))
+            {
+                // IObserver was registered by IPEndpoint
+                observer1.OnNext(value);
+                handled = true;
+            } 
+            else if (_typeMap.TryGetValue(value.GetType(), out observer2))
+            {
+                // IObserver was registered by event type
+                IRemoteIdentifier id = new SocketRemoteIdentifier(remoteEvent.RemoteEndPoint);
+                IRemoteMessage<T> remoteMessage = new DefaultRemoteMessage<T>(id, value);
+                observer2.OnNext(remoteMessage);
+                handled = true;
+            }
+
+            if (!handled)
+            {
+                throw new WakeRuntimeException("Unrecognized Wake RemoteEvent message");
+            }
+        }
+
+        public void OnError(Exception error)
+        {
+            throw error;
+        }
+
+        public void OnCompleted()
+        {
+            Logger.Log(Level.Info, "Exiting the Writable Observer Container");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/a8ebe24e/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteEvent.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteEvent.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteEvent.cs
new file mode 100644
index 0000000..b9702d6
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteEvent.cs
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Net;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Org.Apache.REEF.Wake.Remote.Impl
+{
+    /// <summary>
+    /// Writable remote event class
+    /// </summary>
+    /// <typeparam name="T">Type of remote event message. It is assumed that T implements IWritable</typeparam>
+    [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)]
+    internal sealed class WritableRemoteEvent<T> : IWritableRemoteEvent<T> where T : IWritable
+    {
+        /// <summary>
+        /// Creates the Remote Event
+        /// </summary>
+        /// <param name="localEndpoint">Local Address</param>
+        /// <param name="remoteEndpoint">Remote Address</param>
+        /// <param name="value">Actual message</param>
+        public WritableRemoteEvent(IPEndPoint localEndpoint, IPEndPoint remoteEndpoint, T value)
+        {
+            LocalEndPoint = localEndpoint;
+            RemoteEndPoint = remoteEndpoint;
+            Value = value;
+        }
+
+        /// <summary>
+        /// Creates empty Remote Event
+        /// </summary>
+        public WritableRemoteEvent()
+        {
+        }
+
+        /// <summary>
+        /// Local Address
+        /// </summary>
+        public IPEndPoint LocalEndPoint { get; set; }
+
+        /// <summary>
+        /// Remote Address
+        /// </summary>
+        public IPEndPoint RemoteEndPoint { get; set; }
+
+        /// <summary>
+        /// The actual message
+        /// </summary>
+        public T Value { get; set; }
+
+        /// <summary>
+        /// Read the class fields.
+        /// </summary>
+        /// <param name="reader">The reader from which to read </param>
+        public void Read(IDataReader reader)
+        {
+            Value = Activator.CreateInstance<T>();
+            Value.Read(reader);         
+        }
+
+        /// <summary>
+        /// Writes the class fields.
+        /// </summary>
+        /// <param name="writer">The writer to which to write</param>
+        public void Write(IDataWriter writer)
+        {
+            Value.Write(writer);           
+        }
+
+        /// <summary>
+        /// Read the class fields.
+        /// </summary>
+        /// <param name="reader">The reader from which to read </param>
+        /// <param name="token">The cancellation token</param>
+        public async Task ReadAsync(IDataReader reader, CancellationToken token)
+        {
+            Value = Activator.CreateInstance<T>();
+            await Value.ReadAsync(reader, token);      
+        }
+
+        /// <summary>
+        /// Writes the class fields.
+        /// </summary>
+        /// <param name="writer">The writer to which to write</param>
+        /// <param name="token">The cancellation token</param>
+        public async Task WriteAsync(IDataWriter writer, CancellationToken token)
+        {
+            await Value.WriteAsync(writer, token);    
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/a8ebe24e/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManager.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManager.cs
new file mode 100644
index 0000000..285db71
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManager.cs
@@ -0,0 +1,279 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Net;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Wake.Util;
+
+namespace Org.Apache.REEF.Wake.Remote.Impl
+{
+    /// <summary>
+    /// Manages incoming and outgoing messages between remote hosts.
+    /// </summary>
+    /// <typeparam name="T">Message type T. It is assumed to be IWritable</typeparam>
+    [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)]
+    public sealed class WritableRemoteManager<T> : IRemoteManager<T> where T : IWritable
+    {
+        private static readonly Logger LOGGER = Logger.GetLogger(typeof (WritableRemoteManager<T>));
+
+        private readonly WritableObserverContainer<T> _observerContainer;
+        private readonly WritableTransportServer<IWritableRemoteEvent<T>> _server;
+        private readonly Dictionary<IPEndPoint, ProxyObserver> _cachedClients;
+
+        /// <summary>
+        /// Constructs a DefaultRemoteManager listening on the specified address and
+        /// a specific port.
+        /// </summary>
+        /// <param name="localAddress">The address to listen on</param>
+        /// <param name="port">The port to listen on</param>
+        [Obsolete("Use IRemoteManagerFactory.GetInstance() instead.", false)]
+        public WritableRemoteManager(IPAddress localAddress, int port)
+        {
+            if (localAddress == null)
+            {
+                throw new ArgumentNullException("localAddress");
+            }
+            if (port < 0)
+            {
+                throw new ArgumentException("Listening port must be greater than or equal to zero");
+            }
+
+            _observerContainer = new WritableObserverContainer<T>();
+            _cachedClients = new Dictionary<IPEndPoint, ProxyObserver>();
+
+            IPEndPoint localEndpoint = new IPEndPoint(localAddress, port);
+
+            // Begin to listen for incoming messages
+            _server = new WritableTransportServer<IWritableRemoteEvent<T>>(localEndpoint, _observerContainer);
+            _server.Run();
+
+            LocalEndpoint = _server.LocalEndpoint;  
+            Identifier = new SocketRemoteIdentifier(LocalEndpoint);
+        }
+
+        /// <summary>
+        /// Constructs a DefaultRemoteManager. Does not listen for incoming messages.
+        /// </summary>
+        [Obsolete("Use IRemoteManagerFactory.GetInstance() instead.", false)]
+        public WritableRemoteManager()
+        {
+            using (LOGGER.LogFunction("WritableRemoteManager::WritableRemoteManager"))
+            {
+                _observerContainer = new WritableObserverContainer<T>();
+                _cachedClients = new Dictionary<IPEndPoint, ProxyObserver>();
+
+                LocalEndpoint = new IPEndPoint(NetworkUtils.LocalIPAddress, 0);
+                Identifier = new SocketRemoteIdentifier(LocalEndpoint);
+            }
+        }
+
+        /// <summary>
+        /// Gets the RemoteIdentifier for the DefaultRemoteManager
+        /// </summary>
+        public IRemoteIdentifier Identifier { get; private set; }
+
+        /// <summary>
+        /// Gets the local IPEndPoint for the DefaultRemoteManager
+        /// </summary>
+        public IPEndPoint LocalEndpoint { get; private set; }
+
+        /// <summary>
+        /// Returns an IObserver used to send messages to the remote host at
+        /// the specified IPEndpoint.
+        /// </summary>
+        /// <param name="remoteEndpoint">The IPEndpoint of the remote host</param>
+        /// <returns>An IObserver used to send messages to the remote host</returns>
+        public IObserver<T> GetRemoteObserver(RemoteEventEndPoint<T> remoteEndpoint)
+        {
+            if (remoteEndpoint == null)
+            {
+                throw new ArgumentNullException("remoteEndpoint");
+            }
+
+            SocketRemoteIdentifier id = remoteEndpoint.Id as SocketRemoteIdentifier;
+            if (id == null)
+            {
+                throw new ArgumentException("ID not supported");
+            }
+
+            return GetRemoteObserver(id.Addr);
+        }
+
+        /// <summary>
+        /// Returns an IObserver used to send messages to the remote host at
+        /// the specified IPEndpoint.
+        /// </summary>
+        /// <param name="remoteEndpoint">The IPEndpoint of the remote host</param>
+        /// <returns>An IObserver used to send messages to the remote host</returns>
+        public IObserver<T> GetRemoteObserver(IPEndPoint remoteEndpoint)
+        {
+            if (remoteEndpoint == null)
+            {
+                throw new ArgumentNullException("remoteEndpoint");
+            }
+
+            ProxyObserver remoteObserver;
+            if (!_cachedClients.TryGetValue(remoteEndpoint, out remoteObserver))
+            {
+                WritableTransportClient<IWritableRemoteEvent<T>> client =
+                    new WritableTransportClient<IWritableRemoteEvent<T>>(remoteEndpoint, _observerContainer);
+
+                remoteObserver = new ProxyObserver(client);
+                _cachedClients[remoteEndpoint] = remoteObserver;
+            }
+
+            return remoteObserver;
+        }
+
+        /// <summary>
+        /// Registers an IObserver used to handle incoming messages from the remote host
+        /// at the specified IPEndPoint.
+        /// The IDisposable that is returned can be used to unregister the IObserver.
+        /// </summary>
+        /// <param name="remoteEndpoint">The IPEndPoint of the remote host</param>
+        /// <param name="observer">The IObserver to handle incoming messages</param>
+        /// <returns>An IDisposable used to unregister the observer with</returns>
+        public IDisposable RegisterObserver(RemoteEventEndPoint<T> remoteEndpoint, IObserver<T> observer)
+        {
+            if (remoteEndpoint == null)
+            {
+                throw new ArgumentNullException("remoteEndpoint");
+            }
+
+            if (observer == null)
+            {
+                throw new ArgumentNullException("observer");
+            }
+
+            SocketRemoteIdentifier id = remoteEndpoint.Id as SocketRemoteIdentifier;
+            if (id == null)
+            {
+                throw new ArgumentException("ID not supported");
+            }
+
+            return RegisterObserver(id.Addr, observer);
+        }
+
+        /// <summary>
+        /// Registers an IObserver used to handle incoming messages from the remote host
+        /// at the specified IPEndPoint.
+        /// The IDisposable that is returned can be used to unregister the IObserver.
+        /// </summary>
+        /// <param name="remoteEndpoint">The IPEndPoint of the remote host</param>
+        /// <param name="observer">The IObserver to handle incoming messages</param>
+        /// <returns>An IDisposable used to unregister the observer with</returns>
+        public IDisposable RegisterObserver(IPEndPoint remoteEndpoint, IObserver<T> observer)
+        {
+            if (remoteEndpoint == null)
+            {
+                throw new ArgumentNullException("remoteEndpoint");
+            }
+            if (observer == null)
+            {
+                throw new ArgumentNullException("observer");
+            }
+
+            return _observerContainer.RegisterObserver(remoteEndpoint, observer);
+        }
+
+        /// <summary>
+        /// Registers an IObserver used to handle incoming messages from the remote host
+        /// at the specified IPEndPoint.
+        /// The IDisposable that is returned can be used to unregister the IObserver.
+        /// </summary>
+        /// <param name="observer">The IObserver to handle incoming messages</param>
+        /// <returns>An IDisposable used to unregister the observer with</returns>
+        public IDisposable RegisterObserver(IObserver<IRemoteMessage<T>> observer)
+        {
+            if (observer == null)
+            {
+                throw new ArgumentNullException("observer");
+            }
+
+            return _observerContainer.RegisterObserver(observer);
+        }
+
+        /// <summary>
+        /// Release all resources for the DefaultRemoteManager.
+        /// </summary>
+        public void Dispose()
+        {
+            foreach (ProxyObserver cachedClient in _cachedClients.Values)
+            {
+                cachedClient.Dispose();
+            }
+
+            if (_server != null)
+            {
+                _server.Dispose();
+            }
+        }
+
+        /// <summary>
+        /// Observer to send messages to connected remote host
+        /// </summary>
+        private class ProxyObserver : IObserver<T>, IDisposable
+        {
+            private readonly WritableTransportClient<IWritableRemoteEvent<T>> _client;
+
+            /// <summary>
+            /// Create new ProxyObserver
+            /// </summary>
+            /// <param name="client">The connected WritableTransport client used to send
+            /// messages to remote host</param>
+            public ProxyObserver(WritableTransportClient<IWritableRemoteEvent<T>> client)
+            {
+                _client = client;
+            }
+
+            /// <summary>
+            /// Send the message to the remote host
+            /// </summary>
+            /// <param name="message">The message to send</param>
+            public void OnNext(T message)
+            {
+                IWritableRemoteEvent<T> remoteEvent = new WritableRemoteEvent<T>(_client.Link.LocalEndpoint,
+                    _client.Link.RemoteEndpoint,
+                    message);
+
+                _client.Send(remoteEvent);
+            }
+
+            /// <summary>
+            /// Close underlying WritableTransport client
+            /// </summary>
+            public void Dispose()
+            {
+                _client.Dispose();
+            }
+
+            public void OnError(Exception error)
+            {
+                throw error;
+            }
+
+            public void OnCompleted()
+            {
+                throw new NotImplementedException();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/a8ebe24e/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManagerFactory.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManagerFactory.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManagerFactory.cs
new file mode 100644
index 0000000..6d3c4ad
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManagerFactory.cs
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Net;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Wake.Remote.Impl;
+
+namespace Org.Apache.REEF.Wake.Impl
+{
+    /// <summary>
+    /// WritableRemoteManagerFactory for WritableRemoteManager.
+    /// </summary>
+    [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)]
+    public sealed class WritableRemoteManagerFactory
+    {
+        [Inject]
+        private WritableRemoteManagerFactory()
+        {
+        }
+
+        public IRemoteManager<T> GetInstance<T>(IPAddress localAddress, int port) where T : IWritable
+        {
+#pragma warning disable 618
+// This is the one place allowed to call this constructor. Hence, disabling the warning is OK.
+            return new WritableRemoteManager<T>(localAddress, port);
+#pragma warning disable 618
+        }
+
+        public IRemoteManager<T> GetInstance<T>() where T : IWritable
+        {
+#pragma warning disable 618
+// This is the one place allowed to call this constructor. Hence, disabling the warning is OK.
+            return new WritableRemoteManager<T>();
+#pragma warning disable 618
+        }
+    }
+}
\ No newline at end of file