You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/05/06 02:14:00 UTC
incubator-reef git commit: [REEF-258] Improve RemoteManager memory
efficiency by using Writables
Repository: incubator-reef
Updated Branches:
refs/heads/master 31d41f886 -> a8ebe24e5
[REEF-258] Improve RemoteManager memory efficiency by using Writables
This change adds the `WritableRemoteManager` implementation of `IRemoteManager`.
It is more memory efficient as it only creates one serialized copy of each
message instead of requiring it to be pre-serialized into a `byte[]`.
JIRA:
[REEF-258](https://issues.apache.org/jira/browse/REEF-258)
Pull Request:
This closes #170
Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/a8ebe24e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/a8ebe24e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/a8ebe24e
Branch: refs/heads/master
Commit: a8ebe24e5e626f916dfb6276184284725228361d
Parents: 31d41f8
Author: dkm2110 <dh...@gmail.com>
Authored: Fri May 1 15:49:10 2015 -0700
Committer: Markus Weimer <we...@apache.org>
Committed: Tue May 5 17:09:56 2015 -0700
----------------------------------------------------------------------
.../Org.Apache.REEF.Wake.Tests.csproj | 1 +
.../WritableRemoteManagerTest.cs | 375 +++++++++++++++++++
.../Impl/DefaultRemoteManagerFactory.cs | 53 ---
.../Org.Apache.REEF.Wake.csproj | 7 +-
.../Remote/IWritableRemoteEvent.cs | 45 +++
.../Remote/Impl/DefaultRemoteManagerFactory.cs | 53 +++
.../Remote/Impl/WritableObserverContainer.cs | 132 +++++++
.../Remote/Impl/WritableRemoteEvent.cs | 109 ++++++
.../Remote/Impl/WritableRemoteManager.cs | 279 ++++++++++++++
.../Remote/Impl/WritableRemoteManagerFactory.cs | 55 +++
10 files changed, 1055 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/a8ebe24e/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj b/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj
index d709933..f947422 100644
--- a/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj
@@ -49,6 +49,7 @@ under the License.
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="PubSubSubjectTest.cs" />
<Compile Include="RemoteManagerTest.cs" />
+ <Compile Include="WritableRemoteManagerTest.cs" />
<Compile Include="WritableTransportTest.cs" />
<Compile Include="TransportTest.cs" />
<Compile Include="WritableString.cs" />
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/a8ebe24e/lang/cs/Org.Apache.REEF.Wake.Tests/WritableRemoteManagerTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/WritableRemoteManagerTest.cs b/lang/cs/Org.Apache.REEF.Wake.Tests/WritableRemoteManagerTest.cs
new file mode 100644
index 0000000..6f7baf9
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/WritableRemoteManagerTest.cs
@@ -0,0 +1,375 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.IO;
+using System.Net;
+using System.Reactive;
+using System.Threading.Tasks;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Wake.Impl;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Wake.Remote.Impl;
+using Org.Apache.REEF.Wake.Util;
+
+namespace Org.Apache.REEF.Wake.Tests
+{
+ [TestClass]
+ [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)]
+ public class WritableRemoteManagerTest
+ {
+ private readonly WritableRemoteManagerFactory _remoteManagerFactory =
+ TangFactory.GetTang().NewInjector().GetInstance<WritableRemoteManagerFactory>();
+
+ /// <summary>
+ /// Tests one way communication between Remote Managers
+ /// Remote Manager listens on any available port
+ /// </summary>
+ [TestMethod]
+ public void TestWritableOneWayCommunication()
+ {
+ IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
+
+ BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>();
+ List<string> events = new List<string>();
+
+ using (var remoteManager1 = _remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0))
+ using (var remoteManager2 = _remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0))
+ {
+ var observer = Observer.Create<WritableString>(queue.Add);
+ IPEndPoint endpoint1 = new IPEndPoint(listeningAddress, 0);
+ remoteManager2.RegisterObserver(endpoint1, observer);
+
+ var remoteObserver = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
+ remoteObserver.OnNext(new WritableString("abc"));
+ remoteObserver.OnNext(new WritableString("def"));
+ remoteObserver.OnNext(new WritableString("ghi"));
+
+ events.Add(queue.Take().Data);
+ events.Add(queue.Take().Data);
+ events.Add(queue.Take().Data);
+ }
+
+ Assert.AreEqual(3, events.Count);
+ }
+
+ /// <summary>
+ /// Tests one way communication between Remote Managers
+ /// Remote manager listens on a particular port
+ /// </summary>
+ [TestMethod]
+ public void TestWritableOneWayCommunicationClientOnly()
+ {
+ int listeningPort = NetworkUtils.GenerateRandomPort(6000, 7000);
+ IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
+
+ BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>();
+ List<string> events = new List<string>();
+
+ using (var remoteManager1 = _remoteManagerFactory.GetInstance<WritableString>())
+ using (var remoteManager2 = _remoteManagerFactory.GetInstance<WritableString>(listeningAddress, listeningPort))
+ {
+ IPEndPoint remoteEndpoint = new IPEndPoint(listeningAddress, 0);
+ var observer = Observer.Create<WritableString>(queue.Add);
+ remoteManager2.RegisterObserver(remoteEndpoint, observer);
+
+ var remoteObserver = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
+ remoteObserver.OnNext(new WritableString("abc"));
+ remoteObserver.OnNext(new WritableString("def"));
+ remoteObserver.OnNext(new WritableString("ghi"));
+
+ events.Add(queue.Take().Data);
+ events.Add(queue.Take().Data);
+ events.Add(queue.Take().Data);
+ }
+
+ Assert.AreEqual(3, events.Count);
+ }
+
+ /// <summary>
+ /// Tests two way communications. Checks whether both sides are able to receive messages
+ /// </summary>
+ [TestMethod]
+ public void TestWritableTwoWayCommunication()
+ {
+ IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
+
+ BlockingCollection<WritableString> queue1 = new BlockingCollection<WritableString>();
+ BlockingCollection<WritableString> queue2 = new BlockingCollection<WritableString>();
+ List<string> events1 = new List<string>();
+ List<string> events2 = new List<string>();
+
+ using (var remoteManager1 = _remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0))
+ using (var remoteManager2 = _remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0))
+ {
+ // Register observers for remote manager 1 and remote manager 2
+ var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
+ var observer1 = Observer.Create<WritableString>(queue1.Add);
+ var observer2 = Observer.Create<WritableString>(queue2.Add);
+ remoteManager1.RegisterObserver(remoteEndpoint, observer1);
+ remoteManager2.RegisterObserver(remoteEndpoint, observer2);
+
+ // Remote manager 1 sends 3 events to remote manager 2
+ var remoteObserver1 = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
+ remoteObserver1.OnNext(new WritableString("abc"));
+ remoteObserver1.OnNext(new WritableString("def"));
+ remoteObserver1.OnNext(new WritableString("ghi"));
+
+ // Remote manager 2 sends 4 events to remote manager 1
+ var remoteObserver2 = remoteManager2.GetRemoteObserver(remoteManager1.LocalEndpoint);
+ remoteObserver2.OnNext(new WritableString("jkl"));
+ remoteObserver2.OnNext(new WritableString("mno"));
+ remoteObserver2.OnNext(new WritableString("pqr"));
+ remoteObserver2.OnNext(new WritableString("stu"));
+
+ events1.Add(queue1.Take().Data);
+ events1.Add(queue1.Take().Data);
+ events1.Add(queue1.Take().Data);
+ events1.Add(queue1.Take().Data);
+
+ events2.Add(queue2.Take().Data);
+ events2.Add(queue2.Take().Data);
+ events2.Add(queue2.Take().Data);
+ }
+
+ Assert.AreEqual(4, events1.Count);
+ Assert.AreEqual(3, events2.Count);
+ }
+
+ /// <summary>
+ /// Tests one way communication between 3 nodes.
+ /// nodes 1 and 2 send messages to node 3
+ /// </summary>
+ [TestMethod]
+ public void TestWritableCommunicationThreeNodesOneWay()
+ {
+ IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
+
+ BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>();
+ List<string> events = new List<string>();
+
+ using (var remoteManager1 = _remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0))
+ using (var remoteManager2 = _remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0))
+ using (var remoteManager3 = _remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0))
+ {
+ var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
+ var observer = Observer.Create<WritableString>(queue.Add);
+ remoteManager3.RegisterObserver(remoteEndpoint, observer);
+
+ var remoteObserver1 = remoteManager1.GetRemoteObserver(remoteManager3.LocalEndpoint);
+ var remoteObserver2 = remoteManager2.GetRemoteObserver(remoteManager3.LocalEndpoint);
+
+ remoteObserver2.OnNext(new WritableString("abc"));
+ remoteObserver1.OnNext(new WritableString("def"));
+ remoteObserver2.OnNext(new WritableString("ghi"));
+ remoteObserver1.OnNext(new WritableString("jkl"));
+ remoteObserver2.OnNext(new WritableString("mno"));
+
+ for (int i = 0; i < 5; i++)
+ {
+ events.Add(queue.Take().Data);
+ }
+ }
+
+ Assert.AreEqual(5, events.Count);
+ }
+
+ /// <summary>
+ /// Tests one way communication between 3 nodes.
+ /// nodes 1 and 2 send messages to node 3 and node 3 sends message back
+ /// </summary>
+ [TestMethod]
+ public void TestWritableCommunicationThreeNodesBothWays()
+ {
+ IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
+
+ BlockingCollection<WritableString> queue1 = new BlockingCollection<WritableString>();
+ BlockingCollection<WritableString> queue2 = new BlockingCollection<WritableString>();
+ BlockingCollection<WritableString> queue3 = new BlockingCollection<WritableString>();
+ List<string> events1 = new List<string>();
+ List<string> events2 = new List<string>();
+ List<string> events3 = new List<string>();
+
+ using (var remoteManager1 = _remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0))
+ using (var remoteManager2 = _remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0))
+ using (var remoteManager3 = _remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0))
+ {
+ var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
+
+ var observer = Observer.Create<WritableString>(queue1.Add);
+ remoteManager1.RegisterObserver(remoteEndpoint, observer);
+ var observer2 = Observer.Create<WritableString>(queue2.Add);
+ remoteManager2.RegisterObserver(remoteEndpoint, observer2);
+ var observer3 = Observer.Create<WritableString>(queue3.Add);
+ remoteManager3.RegisterObserver(remoteEndpoint, observer3);
+
+ var remoteObserver1 = remoteManager1.GetRemoteObserver(remoteManager3.LocalEndpoint);
+ var remoteObserver2 = remoteManager2.GetRemoteObserver(remoteManager3.LocalEndpoint);
+
+ // Observer 1 and 2 send messages to observer 3
+ remoteObserver1.OnNext(new WritableString("abc"));
+ remoteObserver1.OnNext(new WritableString("abc"));
+ remoteObserver1.OnNext(new WritableString("abc"));
+ remoteObserver2.OnNext(new WritableString("def"));
+ remoteObserver2.OnNext(new WritableString("def"));
+
+ // Observer 3 sends messages back to observers 1 and 2
+ var remoteObserver3A = remoteManager3.GetRemoteObserver(remoteManager1.LocalEndpoint);
+ var remoteObserver3B = remoteManager3.GetRemoteObserver(remoteManager2.LocalEndpoint);
+
+ remoteObserver3A.OnNext(new WritableString("ghi"));
+ remoteObserver3A.OnNext(new WritableString("ghi"));
+ remoteObserver3B.OnNext(new WritableString("jkl"));
+ remoteObserver3B.OnNext(new WritableString("jkl"));
+ remoteObserver3B.OnNext(new WritableString("jkl"));
+
+ events1.Add(queue1.Take().Data);
+ events1.Add(queue1.Take().Data);
+
+ events2.Add(queue2.Take().Data);
+ events2.Add(queue2.Take().Data);
+ events2.Add(queue2.Take().Data);
+
+ events3.Add(queue3.Take().Data);
+ events3.Add(queue3.Take().Data);
+ events3.Add(queue3.Take().Data);
+ events3.Add(queue3.Take().Data);
+ events3.Add(queue3.Take().Data);
+ }
+
+ Assert.AreEqual(2, events1.Count);
+ Assert.AreEqual(3, events2.Count);
+ Assert.AreEqual(5, events3.Count);
+ }
+
+ /// <summary>
+ /// Tests whether remote manager is able to send acknowledgement back
+ /// </summary>
+ [TestMethod]
+ public void TestWritableRemoteSenderCallback()
+ {
+ IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
+
+ BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>();
+ List<string> events = new List<string>();
+
+ using (var remoteManager1 = _remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0))
+ using (var remoteManager2 = _remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0))
+ {
+ // Register handler for when remote manager 2 receives events; respond
+ // with an ack
+ var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
+ var remoteObserver2 = remoteManager2.GetRemoteObserver(remoteManager1.LocalEndpoint);
+
+ var receiverObserver = Observer.Create<WritableString>(
+ message => remoteObserver2.OnNext(new WritableString("received message: " + message.Data)));
+ remoteManager2.RegisterObserver(remoteEndpoint, receiverObserver);
+
+ // Register handler for remote manager 1 to record the ack
+ var senderObserver = Observer.Create<WritableString>(queue.Add);
+ remoteManager1.RegisterObserver(remoteEndpoint, senderObserver);
+
+ // Begin to send messages
+ var remoteObserver1 = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
+ remoteObserver1.OnNext(new WritableString("hello"));
+ remoteObserver1.OnNext(new WritableString("there"));
+ remoteObserver1.OnNext(new WritableString("buddy"));
+
+ events.Add(queue.Take().Data);
+ events.Add(queue.Take().Data);
+ events.Add(queue.Take().Data);
+ }
+
+ Assert.AreEqual(3, events.Count);
+ Assert.AreEqual("received message: hello", events[0]);
+ Assert.AreEqual("received message: there", events[1]);
+ Assert.AreEqual("received message: buddy", events[2]);
+ }
+
+ /// <summary>
+ /// Test whether observer can be created with IRemoteMessage interface
+ /// </summary>
+ [TestMethod]
+ public void TestWritableRegisterObserverByType()
+ {
+ IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
+
+ BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>();
+ List<string> events = new List<string>();
+
+ using (var remoteManager1 = _remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0))
+ using (var remoteManager2 = _remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0))
+ {
+ // RemoteManager2 listens and records events of type IRemoteEvent<WritableString>
+ var observer = Observer.Create<IRemoteMessage<WritableString>>(message => queue.Add(message.Message));
+ remoteManager2.RegisterObserver(observer);
+
+ // Remote manager 1 sends 3 events to remote manager 2
+ var remoteObserver = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
+ remoteObserver.OnNext(new WritableString("abc"));
+ remoteObserver.OnNext(new WritableString("def"));
+ remoteObserver.OnNext(new WritableString("ghi"));
+
+ events.Add(queue.Take().Data);
+ events.Add(queue.Take().Data);
+ events.Add(queue.Take().Data);
+ }
+
+ Assert.AreEqual(3, events.Count);
+ }
+
+ /// <summary>
+ /// Tests whether we get the cached observer back for sending message without reinstantiating it
+ /// </summary>
+ [TestMethod]
+ public void TestWritableCachedConnection()
+ {
+ IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
+
+ BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>();
+ List<string> events = new List<string>();
+
+ using (var remoteManager1 = _remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0))
+ using (var remoteManager2 = _remoteManagerFactory.GetInstance<WritableString>(listeningAddress, 0))
+ {
+ var observer = Observer.Create<WritableString>(queue.Add);
+ IPEndPoint endpoint1 = new IPEndPoint(listeningAddress, 0);
+ remoteManager2.RegisterObserver(endpoint1, observer);
+
+ var remoteObserver = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
+ remoteObserver.OnNext(new WritableString("abc"));
+ remoteObserver.OnNext(new WritableString("def"));
+
+ var cachedObserver = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
+ cachedObserver.OnNext(new WritableString("ghi"));
+ cachedObserver.OnNext(new WritableString("jkl"));
+
+ events.Add(queue.Take().Data);
+ events.Add(queue.Take().Data);
+ events.Add(queue.Take().Data);
+ events.Add(queue.Take().Data);
+ }
+
+ Assert.AreEqual(4, events.Count);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/a8ebe24e/lang/cs/Org.Apache.REEF.Wake/Impl/DefaultRemoteManagerFactory.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Impl/DefaultRemoteManagerFactory.cs b/lang/cs/Org.Apache.REEF.Wake/Impl/DefaultRemoteManagerFactory.cs
deleted file mode 100644
index 38a020f..0000000
--- a/lang/cs/Org.Apache.REEF.Wake/Impl/DefaultRemoteManagerFactory.cs
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System.Net;
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Wake.Remote;
-using Org.Apache.REEF.Wake.Remote.Impl;
-
-namespace Org.Apache.REEF.Wake.Impl
-{
- /// <summary>
- /// An implementation of IRemoteManagerFactory for DefaultRemoteManager.
- /// </summary>
- internal sealed class DefaultRemoteManagerFactory : IRemoteManagerFactory
- {
- [Inject]
- private DefaultRemoteManagerFactory()
- {
- }
-
- public IRemoteManager<T> GetInstance<T>(IPAddress localAddress, int port, ICodec<T> codec)
- {
-#pragma warning disable 618
- // This is the one place allowed to call this constructor. Hence, disabling the warning is OK.
- return new DefaultRemoteManager<T>(localAddress, port, codec);
-#pragma warning restore 618
- }
-
- public IRemoteManager<T> GetInstance<T>(ICodec<T> codec)
- {
-#pragma warning disable 618
- // This is the one place allowed to call this constructor. Hence, disabling the warning is OK.
- return new DefaultRemoteManager<T>(codec);
-#pragma warning restore 618
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/a8ebe24e/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj b/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
index 26f2fbf..a62d524 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
+++ b/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
@@ -48,7 +48,8 @@ under the License.
<Compile Include="IEventHandler.cs" />
<Compile Include="IIdentifier.cs" />
<Compile Include="IIdentifierFactory.cs" />
- <Compile Include="Impl\DefaultRemoteManagerFactory.cs" />
+ <Compile Include="Remote\Impl\WritableRemoteManagerFactory.cs" />
+ <Compile Include="Remote\Impl\DefaultRemoteManagerFactory.cs" />
<Compile Include="Impl\LoggingEventHandler.cs" />
<Compile Include="Impl\MissingStartHandlerHandler.cs" />
<Compile Include="Impl\MultiEventHandler.cs" />
@@ -65,11 +66,15 @@ under the License.
<Compile Include="Remote\IDataWriter.cs" />
<Compile Include="Remote\Impl\StreamDataReader.cs" />
<Compile Include="Remote\Impl\StreamDataWriter.cs" />
+ <Compile Include="Remote\Impl\WritableRemoteManager.cs" />
<Compile Include="Remote\Impl\WritableLink.cs" />
+ <Compile Include="Remote\Impl\WritableObserverContainer.cs" />
+ <Compile Include="Remote\Impl\WritableRemoteEvent.cs" />
<Compile Include="Remote\Impl\WritableTransportClient.cs" />
<Compile Include="Remote\Impl\WritableTransportServer.cs" />
<Compile Include="Remote\IRemoteManagerFactory.cs" />
<Compile Include="Remote\IWritable.cs" />
+ <Compile Include="Remote\IWritableRemoteEvent.cs" />
<Compile Include="Remote\Proto\WakeRemoteProtosGen.cs" />
<Compile Include="Remote\ICodec.cs" />
<Compile Include="Remote\ICodecFactory.cs" />
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/a8ebe24e/lang/cs/Org.Apache.REEF.Wake/Remote/IWritableRemoteEvent.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/IWritableRemoteEvent.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/IWritableRemoteEvent.cs
new file mode 100644
index 0000000..40222aa
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/IWritableRemoteEvent.cs
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Linq.Expressions;
+using System.Net;
+
+namespace Org.Apache.REEF.Wake.Remote
+{
+ /// <summary>
+ /// Interface for remote event
+ /// </summary>
+ /// <typeparam name="T">Type of remote event message. It is assumed that T implements IWritable</typeparam>
+ [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)]
+ internal interface IWritableRemoteEvent<T> : IWritable where T : IWritable
+ {
+ /// <summary>
+ /// Local Endpoint
+ /// </summary>
+ IPEndPoint LocalEndPoint { get; set; }
+
+ /// <summary>
+ /// Remote Endpoint
+ /// </summary>
+ IPEndPoint RemoteEndPoint { get; set; }
+
+ T Value { get; }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/a8ebe24e/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManagerFactory.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManagerFactory.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManagerFactory.cs
new file mode 100644
index 0000000..38a020f
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManagerFactory.cs
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Net;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Wake.Remote.Impl;
+
+namespace Org.Apache.REEF.Wake.Impl
+{
+ /// <summary>
+ /// An implementation of IRemoteManagerFactory for DefaultRemoteManager.
+ /// </summary>
+ internal sealed class DefaultRemoteManagerFactory : IRemoteManagerFactory
+ {
+ [Inject]
+ private DefaultRemoteManagerFactory()
+ {
+ }
+
+ public IRemoteManager<T> GetInstance<T>(IPAddress localAddress, int port, ICodec<T> codec)
+ {
+#pragma warning disable 618
+ // This is the one place allowed to call this constructor. Hence, disabling the warning is OK.
+ return new DefaultRemoteManager<T>(localAddress, port, codec);
+#pragma warning restore 618
+ }
+
+ public IRemoteManager<T> GetInstance<T>(ICodec<T> codec)
+ {
+#pragma warning disable 618
+ // This is the one place allowed to call this constructor. Hence, disabling the warning is OK.
+ return new DefaultRemoteManager<T>(codec);
+#pragma warning restore 618
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/a8ebe24e/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableObserverContainer.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableObserverContainer.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableObserverContainer.cs
new file mode 100644
index 0000000..9790e3e
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableObserverContainer.cs
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Concurrent;
+using System.Net;
+using Org.Apache.REEF.Wake.Util;
+using Org.Apache.REEF.Utilities.Diagnostics;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Wake.Remote.Impl
+{
+ /// <summary>
+ /// Stores registered IObservers for DefaultRemoteManager.
+ /// Can register and look up IObservers by remote IPEndPoint.
+ /// </summary>
+ /// <typeparam name="T">Message type T. It is assumed to be IWritable</typeparam>
+ [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)]
+ internal class WritableObserverContainer<T> : IObserver<TransportEvent<IWritableRemoteEvent<T>>> where T : IWritable
+ {
+ private static readonly Logger Logger = Logger.GetLogger(typeof(WritableObserverContainer<>));
+ private readonly ConcurrentDictionary<IPEndPoint, IObserver<T>> _endpointMap;
+ private readonly ConcurrentDictionary<Type, IObserver<IRemoteMessage<T>>> _typeMap;
+ private IObserver<T> _universalObserver;
+
+ /// <summary>
+ /// Constructs a new ObserverContainer used to manage remote IObservers.
+ /// </summary>
+ public WritableObserverContainer()
+ {
+ _endpointMap = new ConcurrentDictionary<IPEndPoint, IObserver<T>>(new IPEndPointComparer());
+ _typeMap = new ConcurrentDictionary<Type, IObserver<IRemoteMessage<T>>>();
+ }
+
+ /// <summary>
+ /// Registers an IObserver used to handle incoming messages from the remote host
+ /// at the specified IPEndPoint.
+ /// </summary>
+ /// <param name="remoteEndpoint">The IPEndPoint of the remote host</param>
+ /// <param name="observer">The IObserver to handle incoming messages</param>
+ /// <returns>An IDisposable used to unregister the observer with</returns>
+ public IDisposable RegisterObserver(IPEndPoint remoteEndpoint, IObserver<T> observer)
+ {
+ if (remoteEndpoint.Address.Equals(IPAddress.Any))
+ {
+ _universalObserver = observer;
+ return Disposable.Create(() => { _universalObserver = null; });
+ }
+
+ _endpointMap[remoteEndpoint] = observer;
+ return Disposable.Create(() => _endpointMap.TryRemove(remoteEndpoint, out observer));
+ }
+
+ /// <summary>
+ /// Registers an IObserver to handle incoming messages from a remote host
+ /// </summary>
+ /// <param name="observer">The IObserver to handle incoming messages</param>
+ /// <returns>An IDisposable used to unregister the observer with</returns>
+ public IDisposable RegisterObserver(IObserver<IRemoteMessage<T>> observer)
+ {
+ _typeMap[typeof(T)] = observer;
+ return Disposable.Create(() => _typeMap.TryRemove(typeof(T), out observer));
+ }
+
+ /// <summary>
+ /// Look up the IObserver for the registered IPEndPoint or event type
+ /// and execute the IObserver.
+ /// </summary>
+ /// <param name="transportEvent">The incoming remote event</param>
+ public void OnNext(TransportEvent<IWritableRemoteEvent<T>> transportEvent)
+ {
+ IWritableRemoteEvent<T> remoteEvent = transportEvent.Data;
+ remoteEvent.LocalEndPoint = transportEvent.Link.LocalEndpoint;
+ remoteEvent.RemoteEndPoint = transportEvent.Link.RemoteEndpoint;
+ T value = remoteEvent.Value;
+ bool handled = false;
+
+ IObserver<T> observer1;
+ IObserver<IRemoteMessage<T>> observer2;
+ if (_universalObserver != null)
+ {
+ _universalObserver.OnNext(value);
+ handled = true;
+ }
+ if (_endpointMap.TryGetValue(remoteEvent.RemoteEndPoint, out observer1))
+ {
+ // IObserver was registered by IPEndpoint
+ observer1.OnNext(value);
+ handled = true;
+ }
+ else if (_typeMap.TryGetValue(value.GetType(), out observer2))
+ {
+ // IObserver was registered by event type
+ IRemoteIdentifier id = new SocketRemoteIdentifier(remoteEvent.RemoteEndPoint);
+ IRemoteMessage<T> remoteMessage = new DefaultRemoteMessage<T>(id, value);
+ observer2.OnNext(remoteMessage);
+ handled = true;
+ }
+
+ if (!handled)
+ {
+ throw new WakeRuntimeException("Unrecognized Wake RemoteEvent message");
+ }
+ }
+
+ public void OnError(Exception error)
+ {
+ throw error;
+ }
+
+ public void OnCompleted()
+ {
+ Logger.Log(Level.Info, "Exiting the Writable Observer Container");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/a8ebe24e/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteEvent.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteEvent.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteEvent.cs
new file mode 100644
index 0000000..b9702d6
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteEvent.cs
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Net;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Org.Apache.REEF.Wake.Remote.Impl
+{
+ /// <summary>
+ /// Writable remote event class
+ /// </summary>
+ /// <typeparam name="T">Type of remote event message. It is assumed that T implements IWritable</typeparam>
+ [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)]
+ internal sealed class WritableRemoteEvent<T> : IWritableRemoteEvent<T> where T : IWritable
+ {
+ /// <summary>
+ /// Creates the Remote Event
+ /// </summary>
+ /// <param name="localEndpoint">Local Address</param>
+ /// <param name="remoteEndpoint">Remote Address</param>
+ /// <param name="value">Actual message</param>
+ public WritableRemoteEvent(IPEndPoint localEndpoint, IPEndPoint remoteEndpoint, T value)
+ {
+ LocalEndPoint = localEndpoint;
+ RemoteEndPoint = remoteEndpoint;
+ Value = value;
+ }
+
+ /// <summary>
+ /// Creates empty Remote Event
+ /// </summary>
+ public WritableRemoteEvent()
+ {
+ }
+
+ /// <summary>
+ /// Local Address
+ /// </summary>
+ public IPEndPoint LocalEndPoint { get; set; }
+
+ /// <summary>
+ /// Remote Address
+ /// </summary>
+ public IPEndPoint RemoteEndPoint { get; set; }
+
+ /// <summary>
+ /// The actual message
+ /// </summary>
+ public T Value { get; set; }
+
+ /// <summary>
+ /// Read the class fields.
+ /// </summary>
+ /// <param name="reader">The reader from which to read </param>
+ public void Read(IDataReader reader)
+ {
+ Value = Activator.CreateInstance<T>();
+ Value.Read(reader);
+ }
+
+ /// <summary>
+ /// Writes the class fields.
+ /// </summary>
+ /// <param name="writer">The writer to which to write</param>
+ public void Write(IDataWriter writer)
+ {
+ Value.Write(writer);
+ }
+
+ /// <summary>
+ /// Read the class fields.
+ /// </summary>
+ /// <param name="reader">The reader from which to read </param>
+ /// <param name="token">The cancellation token</param>
+ public async Task ReadAsync(IDataReader reader, CancellationToken token)
+ {
+ Value = Activator.CreateInstance<T>();
+ await Value.ReadAsync(reader, token);
+ }
+
+ /// <summary>
+ /// Writes the class fields.
+ /// </summary>
+ /// <param name="writer">The writer to which to write</param>
+ /// <param name="token">The cancellation token</param>
+ public async Task WriteAsync(IDataWriter writer, CancellationToken token)
+ {
+ await Value.WriteAsync(writer, token);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/a8ebe24e/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManager.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManager.cs
new file mode 100644
index 0000000..285db71
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManager.cs
@@ -0,0 +1,279 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Net;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Wake.Util;
+
+namespace Org.Apache.REEF.Wake.Remote.Impl
+{
+ /// <summary>
+ /// Manages incoming and outgoing messages between remote hosts.
+ /// </summary>
+ /// <typeparam name="T">Message type T. It is assumed to be IWritable</typeparam>
+ [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)]
+ public sealed class WritableRemoteManager<T> : IRemoteManager<T> where T : IWritable
+ {
+ private static readonly Logger LOGGER = Logger.GetLogger(typeof (WritableRemoteManager<T>));
+
+ private readonly WritableObserverContainer<T> _observerContainer;
+ private readonly WritableTransportServer<IWritableRemoteEvent<T>> _server;
+ private readonly Dictionary<IPEndPoint, ProxyObserver> _cachedClients;
+
+ /// <summary>
+ /// Constructs a DefaultRemoteManager listening on the specified address and
+ /// a specific port.
+ /// </summary>
+ /// <param name="localAddress">The address to listen on</param>
+ /// <param name="port">The port to listen on</param>
+ [Obsolete("Use IRemoteManagerFactory.GetInstance() instead.", false)]
+ public WritableRemoteManager(IPAddress localAddress, int port)
+ {
+ if (localAddress == null)
+ {
+ throw new ArgumentNullException("localAddress");
+ }
+ if (port < 0)
+ {
+ throw new ArgumentException("Listening port must be greater than or equal to zero");
+ }
+
+ _observerContainer = new WritableObserverContainer<T>();
+ _cachedClients = new Dictionary<IPEndPoint, ProxyObserver>();
+
+ IPEndPoint localEndpoint = new IPEndPoint(localAddress, port);
+
+ // Begin to listen for incoming messages
+ _server = new WritableTransportServer<IWritableRemoteEvent<T>>(localEndpoint, _observerContainer);
+ _server.Run();
+
+ LocalEndpoint = _server.LocalEndpoint;
+ Identifier = new SocketRemoteIdentifier(LocalEndpoint);
+ }
+
+ /// <summary>
+ /// Constructs a DefaultRemoteManager. Does not listen for incoming messages.
+ /// </summary>
+ [Obsolete("Use IRemoteManagerFactory.GetInstance() instead.", false)]
+ public WritableRemoteManager()
+ {
+ using (LOGGER.LogFunction("WritableRemoteManager::WritableRemoteManager"))
+ {
+ _observerContainer = new WritableObserverContainer<T>();
+ _cachedClients = new Dictionary<IPEndPoint, ProxyObserver>();
+
+ LocalEndpoint = new IPEndPoint(NetworkUtils.LocalIPAddress, 0);
+ Identifier = new SocketRemoteIdentifier(LocalEndpoint);
+ }
+ }
+
+ /// <summary>
+ /// Gets the RemoteIdentifier for the DefaultRemoteManager
+ /// </summary>
+ public IRemoteIdentifier Identifier { get; private set; }
+
+ /// <summary>
+ /// Gets the local IPEndPoint for the DefaultRemoteManager
+ /// </summary>
+ public IPEndPoint LocalEndpoint { get; private set; }
+
+ /// <summary>
+ /// Returns an IObserver used to send messages to the remote host at
+ /// the specified IPEndpoint.
+ /// </summary>
+ /// <param name="remoteEndpoint">The IPEndpoint of the remote host</param>
+ /// <returns>An IObserver used to send messages to the remote host</returns>
+ public IObserver<T> GetRemoteObserver(RemoteEventEndPoint<T> remoteEndpoint)
+ {
+ if (remoteEndpoint == null)
+ {
+ throw new ArgumentNullException("remoteEndpoint");
+ }
+
+ SocketRemoteIdentifier id = remoteEndpoint.Id as SocketRemoteIdentifier;
+ if (id == null)
+ {
+ throw new ArgumentException("ID not supported");
+ }
+
+ return GetRemoteObserver(id.Addr);
+ }
+
+ /// <summary>
+ /// Returns an IObserver used to send messages to the remote host at
+ /// the specified IPEndpoint.
+ /// </summary>
+ /// <param name="remoteEndpoint">The IPEndpoint of the remote host</param>
+ /// <returns>An IObserver used to send messages to the remote host</returns>
+ public IObserver<T> GetRemoteObserver(IPEndPoint remoteEndpoint)
+ {
+ if (remoteEndpoint == null)
+ {
+ throw new ArgumentNullException("remoteEndpoint");
+ }
+
+ ProxyObserver remoteObserver;
+ if (!_cachedClients.TryGetValue(remoteEndpoint, out remoteObserver))
+ {
+ WritableTransportClient<IWritableRemoteEvent<T>> client =
+ new WritableTransportClient<IWritableRemoteEvent<T>>(remoteEndpoint, _observerContainer);
+
+ remoteObserver = new ProxyObserver(client);
+ _cachedClients[remoteEndpoint] = remoteObserver;
+ }
+
+ return remoteObserver;
+ }
+
+ /// <summary>
+ /// Registers an IObserver used to handle incoming messages from the remote host
+ /// at the specified IPEndPoint.
+ /// The IDisposable that is returned can be used to unregister the IObserver.
+ /// </summary>
+ /// <param name="remoteEndpoint">The IPEndPoint of the remote host</param>
+ /// <param name="observer">The IObserver to handle incoming messages</param>
+ /// <returns>An IDisposable used to unregister the observer with</returns>
+ public IDisposable RegisterObserver(RemoteEventEndPoint<T> remoteEndpoint, IObserver<T> observer)
+ {
+ if (remoteEndpoint == null)
+ {
+ throw new ArgumentNullException("remoteEndpoint");
+ }
+
+ if (observer == null)
+ {
+ throw new ArgumentNullException("observer");
+ }
+
+ SocketRemoteIdentifier id = remoteEndpoint.Id as SocketRemoteIdentifier;
+ if (id == null)
+ {
+ throw new ArgumentException("ID not supported");
+ }
+
+ return RegisterObserver(id.Addr, observer);
+ }
+
+ /// <summary>
+ /// Registers an IObserver used to handle incoming messages from the remote host
+ /// at the specified IPEndPoint.
+ /// The IDisposable that is returned can be used to unregister the IObserver.
+ /// </summary>
+ /// <param name="remoteEndpoint">The IPEndPoint of the remote host</param>
+ /// <param name="observer">The IObserver to handle incoming messages</param>
+ /// <returns>An IDisposable used to unregister the observer with</returns>
+ public IDisposable RegisterObserver(IPEndPoint remoteEndpoint, IObserver<T> observer)
+ {
+ if (remoteEndpoint == null)
+ {
+ throw new ArgumentNullException("remoteEndpoint");
+ }
+ if (observer == null)
+ {
+ throw new ArgumentNullException("observer");
+ }
+
+ return _observerContainer.RegisterObserver(remoteEndpoint, observer);
+ }
+
+ /// <summary>
+ /// Registers an IObserver used to handle incoming messages from the remote host
+ /// at the specified IPEndPoint.
+ /// The IDisposable that is returned can be used to unregister the IObserver.
+ /// </summary>
+ /// <param name="observer">The IObserver to handle incoming messages</param>
+ /// <returns>An IDisposable used to unregister the observer with</returns>
+ public IDisposable RegisterObserver(IObserver<IRemoteMessage<T>> observer)
+ {
+ if (observer == null)
+ {
+ throw new ArgumentNullException("observer");
+ }
+
+ return _observerContainer.RegisterObserver(observer);
+ }
+
+ /// <summary>
+ /// Release all resources for the DefaultRemoteManager.
+ /// </summary>
+ public void Dispose()
+ {
+ foreach (ProxyObserver cachedClient in _cachedClients.Values)
+ {
+ cachedClient.Dispose();
+ }
+
+ if (_server != null)
+ {
+ _server.Dispose();
+ }
+ }
+
+ /// <summary>
+ /// Observer to send messages to connected remote host
+ /// </summary>
+ private class ProxyObserver : IObserver<T>, IDisposable
+ {
+ private readonly WritableTransportClient<IWritableRemoteEvent<T>> _client;
+
+ /// <summary>
+ /// Create new ProxyObserver
+ /// </summary>
+ /// <param name="client">The connected WritableTransport client used to send
+ /// messages to remote host</param>
+ public ProxyObserver(WritableTransportClient<IWritableRemoteEvent<T>> client)
+ {
+ _client = client;
+ }
+
+ /// <summary>
+ /// Send the message to the remote host
+ /// </summary>
+ /// <param name="message">The message to send</param>
+ public void OnNext(T message)
+ {
+ IWritableRemoteEvent<T> remoteEvent = new WritableRemoteEvent<T>(_client.Link.LocalEndpoint,
+ _client.Link.RemoteEndpoint,
+ message);
+
+ _client.Send(remoteEvent);
+ }
+
+ /// <summary>
+ /// Close underlying WritableTransport client
+ /// </summary>
+ public void Dispose()
+ {
+ _client.Dispose();
+ }
+
+ public void OnError(Exception error)
+ {
+ throw error;
+ }
+
+ public void OnCompleted()
+ {
+ throw new NotImplementedException();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/a8ebe24e/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManagerFactory.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManagerFactory.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManagerFactory.cs
new file mode 100644
index 0000000..6d3c4ad
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManagerFactory.cs
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Net;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Wake.Remote.Impl;
+
+namespace Org.Apache.REEF.Wake.Impl
+{
+ /// <summary>
+ /// WritableRemoteManagerFactory for WritableRemoteManager.
+ /// </summary>
+ [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)]
+ public sealed class WritableRemoteManagerFactory
+ {
+ [Inject]
+ private WritableRemoteManagerFactory()
+ {
+ }
+
+ public IRemoteManager<T> GetInstance<T>(IPAddress localAddress, int port) where T : IWritable
+ {
+#pragma warning disable 618
+// This is the one place allowed to call this constructor. Hence, disabling the warning is OK.
+ return new WritableRemoteManager<T>(localAddress, port);
+#pragma warning disable 618
+ }
+
+ public IRemoteManager<T> GetInstance<T>() where T : IWritable
+ {
+#pragma warning disable 618
+// This is the one place allowed to call this constructor. Hence, disabling the warning is OK.
+ return new WritableRemoteManager<T>();
+#pragma warning disable 618
+ }
+ }
+}
\ No newline at end of file