You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/01/29 21:43:03 UTC

[19/31] incubator-reef git commit: [REEF-97] Add the REEF.NET code base

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-io/Network/Naming/Events/NamingUnregisterResponse.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-io/Network/Naming/Events/NamingUnregisterResponse.cs b/lang/cs/Source/REEF/reef-io/Network/Naming/Events/NamingUnregisterResponse.cs
new file mode 100644
index 0000000..9a3775d
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-io/Network/Naming/Events/NamingUnregisterResponse.cs
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.Reef.IO.Network.Naming.Events
+{
+    /// <summary>
+    /// Response event for unregistering of an IPEndpoint with the Name Service
+    /// </summary>
+    internal class NamingUnregisterResponse : NamingEvent
+    {
+        public NamingUnregisterResponse(NamingUnregisterRequest request)
+        {
+            Request = request;
+        }
+
+        public NamingUnregisterRequest Request { get; set; }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-io/Network/Naming/INameServer.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-io/Network/Naming/INameServer.cs b/lang/cs/Source/REEF/reef-io/Network/Naming/INameServer.cs
new file mode 100644
index 0000000..fc8e891
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-io/Network/Naming/INameServer.cs
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Net;
+using Org.Apache.Reef.Common.io;
+using Org.Apache.Reef.IO.Network.Naming.Events;
+using Org.Apache.Reef.Tang.Annotations;
+
+namespace Org.Apache.Reef.IO.Network.Naming
+{
+    /// <summary>
+    /// Service that manages names and IPEndpoints for well known hosts.
+    /// Can register, unregister, and look up IPAddresses using a string identifier.
+    /// </summary>
+    [DefaultImplementation(typeof(NameServer))]
+    public interface INameServer : IDisposable
+    {
+        /// <summary>
+        /// Listening endpoint for the NameServer
+        /// </summary>
+        IPEndPoint LocalEndpoint { get; }
+
+        /// <summary>
+        /// Looks up the IPEndpoints for each string identifier
+        /// </summary>
+        /// <param name="ids">The IDs to look up</param>
+        /// <returns>A list of Name assignments representing the identifier
+        /// that was searched for and the mapped IPEndpoint</returns>
+        List<NameAssignment> Lookup(List<string> ids);
+
+        /// <summary>
+        /// Gets all of the registered identifier/endpoint pairs.
+        /// </summary>
+        /// <returns>A list of all of the registered identifiers and their
+        /// mapped IPEndpoints</returns>
+        List<NameAssignment> GetAll();
+
+        /// <summary>
+        /// Registers the string identifier with the given IPEndpoint
+        /// </summary>
+        /// <param name="id">The string ident</param>
+        /// <param name="endpoint">The mapped endpoint</param>
+        void Register(string id, IPEndPoint endpoint);
+
+        /// <summary>
+        /// Unregister the given identifier with the NameServer
+        /// </summary>
+        /// <param name="id">The identifier to unregister</param>
+        void Unregister(string id);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-io/Network/Naming/NameClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-io/Network/Naming/NameClient.cs b/lang/cs/Source/REEF/reef-io/Network/Naming/NameClient.cs
new file mode 100644
index 0000000..b100edb
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-io/Network/Naming/NameClient.cs
@@ -0,0 +1,279 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Common.io;
+using Org.Apache.Reef.IO.Network.Naming.Codec;
+using Org.Apache.Reef.IO.Network.Naming.Events;
+using Org.Apache.Reef.Utilities.Diagnostics;
+using Org.Apache.Reef.Utilities.Logging;
+using Org.Apache.Reef.Tang.Annotations;
+using Org.Apache.Reef.Wake;
+using Org.Apache.Reef.Wake.Remote;
+using Org.Apache.Reef.Wake.Remote.Impl;
+using Org.Apache.Reef.Wake.RX;
+using Org.Apache.Reef.Wake.RX.Impl;
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Linq;
+using System.Net;
+using System.Reactive;
+
+namespace Org.Apache.Reef.IO.Network.Naming
+{
+    /// <summary>
+    /// Client for the Reef name service. 
+    /// Used to register, unregister, and lookup IP Addresses of known hosts.
+    /// </summary>
+    public class NameClient : INameClient
+    {
+        private static Logger _logger = Logger.GetLogger(typeof(NameClient));
+
+        private BlockingCollection<NamingLookupResponse> _lookupResponseQueue;
+        private BlockingCollection<NamingGetAllResponse> _getAllResponseQueue;
+        private BlockingCollection<NamingRegisterResponse> _registerResponseQueue;
+        private BlockingCollection<NamingUnregisterResponse> _unregisterResponseQueue;
+
+        private TransportClient<NamingEvent> _client;
+
+        private NameLookupClient _lookupClient;
+        private NameRegisterClient _registerClient;
+
+        private bool _disposed;
+
+        /// <summary>
+        /// Constructs a NameClient to register, lookup, and unregister IPEndpoints
+        /// with the NameServer.
+        /// </summary>
+        /// <param name="remoteAddress">The ip address of the NameServer</param>
+        /// <param name="remotePort">The port of the NameServer</param>
+        [Inject]
+        public NameClient(
+            [Parameter(typeof(NamingConfigurationOptions.NameServerAddress))] string remoteAddress, 
+            [Parameter(typeof(NamingConfigurationOptions.NameServerPort))] int remotePort)
+        {
+            IPEndPoint remoteEndpoint = new IPEndPoint(IPAddress.Parse(remoteAddress), remotePort);
+            Initialize(remoteEndpoint);
+            _disposed = false;
+        }
+
+        /// <summary>
+        /// Constructs a NameClient to register, lookup, and unregister IPEndpoints
+        /// with the NameServer.
+        /// </summary>
+        /// <param name="remoteEndpoint">The endpoint of the NameServer</param>
+        public NameClient(IPEndPoint remoteEndpoint) 
+        {
+            Initialize(remoteEndpoint);
+            _disposed = false;
+        }
+
+        /// <summary>
+        /// Synchronously registers the identifier with the NameService.  
+        /// Overwrites the previous mapping if the identifier has already 
+        /// been registered.
+        /// </summary>
+        /// <param name="id">The key used to map the remote endpoint</param>
+        /// <param name="endpoint">The endpoint to map</param>
+        public void Register(string id, IPEndPoint endpoint)
+        {
+            if (id == null)
+            {
+                Exceptions.Throw(new ArgumentNullException("id"), _logger);
+            }
+            if (endpoint == null)
+            {
+                Exceptions.Throw(new ArgumentNullException("endpoint"), _logger);
+            }
+
+            _logger.Log(Level.Info, "Registering id: " + id + ", and endpoint: " + endpoint);
+            _registerClient.Register(id, endpoint);
+        }
+
+        /// <summary>
+        /// Synchronously unregisters the remote identifier with the NameService
+        /// </summary>
+        /// <param name="id">The identifier to unregister</param>
+        public void Unregister(string id)
+        {
+            if (id == null)
+            {
+                Exceptions.Throw(new ArgumentNullException("id"), _logger);
+            }
+
+            _logger.Log(Level.Info, "Unregistering id: " + id);
+            _registerClient.Unregister(id);
+        }
+
+        /// <summary>
+        /// Synchronously looks up the IPEndpoint for the registered identifier.
+        /// </summary>
+        /// <param name="id">The identifier to look up</param>
+        /// <returns>The mapped IPEndpoint for the identifier, or null if
+        /// the identifier has not been registered with the NameService</returns>
+        public IPEndPoint Lookup(string id)
+        {
+            if (id == null)
+            {
+                Exceptions.Throw(new ArgumentNullException("id"), _logger);
+            }
+
+            List<NameAssignment> assignments = Lookup(new List<string> { id });
+            if (assignments != null && assignments.Count > 0)
+            {
+                return assignments.First().Endpoint;
+            }
+
+            return null;
+        }
+
+        /// <summary>
+        /// Synchronously looks up the IPEndpoint for each of the registered identifiers in the list.
+        /// </summary>
+        /// <param name="ids">The list of identifiers to look up</param>
+        /// <returns>The list of NameAssignments representing a pair of identifer
+        /// and mapped IPEndpoint for that identifier.  If any of the requested identifiers
+        /// are not registered with the NameService, their corresponding NameAssignment
+        /// IPEndpoint value will be null.</returns>
+        public List<NameAssignment> Lookup(List<string> ids)
+        {
+            if (ids == null || ids.Count == 0)
+            {
+                Exceptions.Throw(new ArgumentNullException("ids cannot be null or empty"), _logger);
+            }
+
+            _logger.Log(Level.Verbose, "Looking up ids");
+            List<NameAssignment> assignments = _lookupClient.Lookup(ids);
+            if (assignments != null)
+            {
+                return assignments;
+            }
+            Exceptions.Throw(new WakeRuntimeException("NameClient failed to look up ids."), _logger);
+            return null;  //above line will throw exception. So null will never be returned.
+        }
+
+        /// <summary>
+        /// Restart the name client in case of failure.
+        /// </summary>
+        /// <param name="serverEndpoint">The new server endpoint to connect to</param>
+        public void Restart(IPEndPoint serverEndpoint)
+        {
+            _client.Dispose();
+            Initialize(serverEndpoint);
+        }
+
+        /// <summary>
+        /// Releases resources used by NameClient
+        /// </summary>
+        public void Dispose()
+        {
+            Dispose(true);
+            GC.SuppressFinalize(this);
+        }
+
+        protected virtual void Dispose(bool disposing)
+        {
+            if (_disposed)
+            {
+                return;
+            }
+            if (disposing)
+            {
+                _client.Dispose();
+            }
+            _disposed = true;
+        }
+
+        /// <summary>
+        /// Create a new transport client connected to the NameServer at the given remote endpoint.
+        /// </summary>
+        /// <param name="serverEndpoint">The NameServer endpoint to connect to.</param>
+        private void Initialize(IPEndPoint serverEndpoint)
+        {
+            _lookupResponseQueue = new BlockingCollection<NamingLookupResponse>();
+            _getAllResponseQueue = new BlockingCollection<NamingGetAllResponse>();
+            _registerResponseQueue = new BlockingCollection<NamingRegisterResponse>();
+            _unregisterResponseQueue = new BlockingCollection<NamingUnregisterResponse>();
+
+            IObserver<TransportEvent<NamingEvent>> clientHandler = CreateClientHandler();
+            ICodec<NamingEvent> codec = CreateClientCodec();
+            _client = new TransportClient<NamingEvent>(serverEndpoint, codec, clientHandler);
+
+            _lookupClient = new NameLookupClient(_client, _lookupResponseQueue, _getAllResponseQueue);
+            _registerClient = new NameRegisterClient(_client, _registerResponseQueue, _unregisterResponseQueue);
+        }
+
+        /// <summary>
+        /// Create handler to handle async responses from the NameServer.
+        /// </summary>
+        /// <returns>The client handler to manage responses from the NameServer</returns>
+        private IObserver<TransportEvent<NamingEvent>> CreateClientHandler()
+        {
+            PubSubSubject<NamingEvent> subject = new PubSubSubject<NamingEvent>();
+            subject.Subscribe(Observer.Create<NamingLookupResponse>(msg => HandleResponse(_lookupResponseQueue, msg)));
+            subject.Subscribe(Observer.Create<NamingGetAllResponse>(msg => HandleResponse(_getAllResponseQueue, msg)));
+            subject.Subscribe(Observer.Create<NamingRegisterResponse>(msg => HandleResponse(_registerResponseQueue, msg)));
+            subject.Subscribe(Observer.Create<NamingUnregisterResponse>(msg => HandleResponse(_unregisterResponseQueue, msg)));
+            return new ClientObserver(subject);
+        }
+
+        /// <summary>
+        /// Create the codec used to serialize/deserialize NamingEvent messages
+        /// </summary>
+        /// <returns>The serialization codec</returns>
+        private ICodec<NamingEvent> CreateClientCodec()
+        {
+            MultiCodec<NamingEvent> codec = new MultiCodec<NamingEvent>();
+            codec.Register(new NamingLookupRequestCodec(), "org.apache.reef.io.network.naming.serialization.NamingLookupRequest");
+            codec.Register(new NamingLookupResponseCodec(), "org.apache.reef.io.network.naming.serialization.NamingLookupResponse");
+            NamingRegisterRequestCodec requestCodec = new NamingRegisterRequestCodec();
+            codec.Register(requestCodec, "org.apache.reef.io.network.naming.serialization.NamingRegisterRequest");
+            codec.Register(new NamingRegisterResponseCodec(requestCodec), "org.apache.reef.io.network.naming.serialization.NamingRegisterResponse");
+            codec.Register(new NamingUnregisterRequestCodec(), "org.apache.reef.io.network.naming.serialization.NamingUnregisterRequest");
+            return codec;
+        }
+
+        private void HandleResponse<T>(BlockingCollection<T> queue, T message)
+        {
+            queue.Add(message);
+        }
+
+        /// <summary>
+        /// Helper class used to handle response events from the NameServer.
+        /// Delegates the event to the appropriate response queue depending on
+        /// its event type.
+        /// </summary>
+        private class ClientObserver : AbstractObserver<TransportEvent<NamingEvent>>
+        {
+            private IObserver<NamingEvent> _handler;
+
+            public ClientObserver(IObserver<NamingEvent> handler)
+            {
+                _handler = handler;
+            }
+
+            public override void OnNext(TransportEvent<NamingEvent> value)
+            {
+                NamingEvent message = value.Data;
+                message.Link = value.Link;
+                _handler.OnNext(message);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-io/Network/Naming/NameLookupClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-io/Network/Naming/NameLookupClient.cs b/lang/cs/Source/REEF/reef-io/Network/Naming/NameLookupClient.cs
new file mode 100644
index 0000000..9443704
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-io/Network/Naming/NameLookupClient.cs
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Linq;
+using System.Net;
+using System.Threading;
+using Org.Apache.Reef.Common.io;
+using Org.Apache.Reef.IO.Network.Naming.Events;
+using Org.Apache.Reef.Wake.Remote.Impl;
+
+namespace Org.Apache.Reef.IO.Network.Naming
+{
+    /// <summary>
+    /// Helper class to send lookup events to the name server
+    /// </summary>
+    internal class NameLookupClient
+    {
+        private TransportClient<NamingEvent> _client;
+        private BlockingCollection<NamingLookupResponse> _lookupResponseQueue;
+        private BlockingCollection<NamingGetAllResponse> _getAllResponseQueue;
+
+        /// <summary>
+        /// Constructs a new NameLookupClient.
+        /// </summary>
+        /// <param name="client">The transport client used to connect to the NameServer</param>
+        /// <param name="lookupQueue">The queue used to signal that a response
+        /// has been received from the NameServer</param>
+        /// <param name="getAllQueue">The queue used to signal that a GetAllResponse 
+        /// has been received from the NameServer</param>
+        public NameLookupClient(TransportClient<NamingEvent> client,
+                                BlockingCollection<NamingLookupResponse> lookupQueue,
+                                BlockingCollection<NamingGetAllResponse> getAllQueue)
+        {
+            _client = client;
+            _lookupResponseQueue = lookupQueue;
+            _getAllResponseQueue = getAllQueue;
+        }
+
+        /// <summary>
+        /// Look up the IPEndPoint that has been registered with the NameServer using
+        /// the given identifier as the key.
+        /// </summary>
+        /// <param name="id">The id for the IPEndPoint</param>
+        /// <param name="token">The cancellation token used for timeout</param>
+        /// <returns>The registered IPEndpoint, or null if the identifer has not 
+        /// been registered with the NameServer or if the operation times out.</returns>
+        public IPEndPoint Lookup(string id, CancellationToken token)
+        {
+            List<string> ids = new List<string> { id };
+            List<NameAssignment> assignment = Lookup(ids);
+            return (assignment == null || assignment.Count == 0) ? null : assignment.First().Endpoint;
+        }
+
+        /// <summary>
+        /// Look up IPEndPoints that have been registered with the NameService
+        /// </summary>
+        /// <param name="ids">The list of ids to look up</param>
+        /// <returns>A list of NameAssignments representing the mapped identifier/IPEndpoint
+        /// pairs</returns>
+        public List<NameAssignment> Lookup(List<string> ids)
+        {
+            _client.Send(new NamingLookupRequest(ids));
+            NamingLookupResponse response = _lookupResponseQueue.Take();
+            return response.NameAssignments;
+        }
+
+        /// <summary>
+        /// Synchronously gets all of the identifier/IPEndpoint pairs registered with the NameService.
+        /// </summary>
+        /// <returns>A list of NameAssignments representing the mapped identifier/IPEndpoint
+        /// pairs</returns>
+        public List<NameAssignment> GetAll()
+        {
+            _client.Send(new NamingGetAllRequest());
+            NamingGetAllResponse response = _getAllResponseQueue.Take();
+            return response.Assignments;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-io/Network/Naming/NameRegisterClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-io/Network/Naming/NameRegisterClient.cs b/lang/cs/Source/REEF/reef-io/Network/Naming/NameRegisterClient.cs
new file mode 100644
index 0000000..4dbf8f4
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-io/Network/Naming/NameRegisterClient.cs
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Collections.Concurrent;
+using System.Net;
+using Org.Apache.Reef.Common.io;
+using Org.Apache.Reef.IO.Network.Naming.Events;
+using Org.Apache.Reef.Wake.Remote.Impl;
+
+namespace Org.Apache.Reef.IO.Network.Naming
+{
+    /// <summary>
+    /// Helper class to send register and unregister events to the NameServer.
+    /// </summary>
+    internal class NameRegisterClient
+    {
+        private TransportClient<NamingEvent> _client;
+        private BlockingCollection<NamingRegisterResponse> _registerResponseQueue;
+        private BlockingCollection<NamingUnregisterResponse> _unregisterResponseQueue;
+
+        public NameRegisterClient(TransportClient<NamingEvent> client,
+                                  BlockingCollection<NamingRegisterResponse> registerQueue,
+                                  BlockingCollection<NamingUnregisterResponse> unregisterQueue)
+        {
+            _client = client;
+            _registerResponseQueue = registerQueue;
+            _unregisterResponseQueue = unregisterQueue;
+        }
+
+        /// <summary>
+        /// Synchronously register the id and endpoint with the NameServer.
+        /// </summary>
+        /// <param name="id">The identifier</param>
+        /// <param name="endpoint">The endpoint</param>
+        public void Register(string id, IPEndPoint endpoint)
+        {
+            NameAssignment assignment = new NameAssignment(id, endpoint);
+            _client.Send(new NamingRegisterRequest(assignment));
+            _registerResponseQueue.Take();
+        }
+
+        /// <summary>
+        /// Synchronously unregisters the identifier with the NameServer.
+        /// </summary>
+        /// <param name="id">The identifer to unregister</param>
+        public void Unregister(string id)
+        {
+            _client.Send(new NamingUnregisterRequest(id));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-io/Network/Naming/NameServer.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-io/Network/Naming/NameServer.cs b/lang/cs/Source/REEF/reef-io/Network/Naming/NameServer.cs
new file mode 100644
index 0000000..6d79d34
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-io/Network/Naming/NameServer.cs
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Common.io;
+using Org.Apache.Reef.IO.Network.Naming.Codec;
+using Org.Apache.Reef.IO.Network.Naming.Events;
+using Org.Apache.Reef.IO.Network.Naming.Observers;
+using Org.Apache.Reef.Utilities.Diagnostics;
+using Org.Apache.Reef.Utilities.Logging;
+using Org.Apache.Reef.Tang.Annotations;
+using Org.Apache.Reef.Wake.Remote;
+using Org.Apache.Reef.Wake.Remote.Impl;
+using Org.Apache.Reef.Wake.RX;
+using Org.Apache.Reef.Wake.RX.Impl;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Net;
+
+namespace Org.Apache.Reef.IO.Network.Naming
+{
+    /// <summary>
+    /// Service that manages names and IPEndpoints for well known hosts.
+    /// Can register, unregister, and look up IPAddresses using a string identifier.
+    /// </summary>
+    public class NameServer : INameServer
+    {
+        private static Logger _logger = Logger.GetLogger(typeof(NameServer));
+
+        private TransportServer<NamingEvent> _server;
+        private Dictionary<string, IPEndPoint> _idToAddrMap;
+
+        /// <summary>
+        /// Create a new NameServer to run on the specified port.
+        /// </summary>
+        /// <param name="port">The port to listen for incoming connections on.</param>
+        [Inject]
+        public NameServer([Parameter(typeof(NamingConfigurationOptions.NameServerPort))] int port)
+        {
+            IObserver<TransportEvent<NamingEvent>> handler = CreateServerHandler();
+            _idToAddrMap = new Dictionary<string, IPEndPoint>();
+            ICodec<NamingEvent> codec = CreateServerCodec();
+
+            // Start transport server, get listening IP endpoint
+            _logger.Log(Level.Info, "Starting naming server");
+            _server = new TransportServer<NamingEvent>(port, handler, codec);
+            _server.Run();
+            LocalEndpoint = _server.LocalEndpoint;
+        }
+
+        public IPEndPoint LocalEndpoint { get; private set; }
+
+        /// <summary>
+        /// Looks up the IPEndpoints for each string identifier
+        /// </summary>
+        /// <param name="ids">The IDs to look up</param>
+        /// <returns>A list of Name assignments representing the identifier
+        /// that was searched for and the mapped IPEndpoint</returns>
+        public List<NameAssignment> Lookup(List<string> ids)
+        {
+            if (ids == null)
+            {
+                Exceptions.Throw(new ArgumentNullException("ids"), _logger);
+            }
+
+            return ids.Where(id => _idToAddrMap.ContainsKey(id))
+                      .Select(id => new NameAssignment(id, _idToAddrMap[id]))
+                      .ToList();
+        }
+
+        /// <summary>
+        /// Gets all of the registered identifier/endpoint pairs.
+        /// </summary>
+        /// <returns>A list of all of the registered identifiers and their
+        /// mapped IPEndpoints</returns>
+        public List<NameAssignment> GetAll()
+        {
+            return _idToAddrMap.Select(pair => new NameAssignment(pair.Key, pair.Value)).ToList();
+        }
+
+        /// <summary>
+        /// Registers the string identifier with the given IPEndpoint
+        /// </summary>
+        /// <param name="id">The string ident</param>
+        /// <param name="endpoint">The mapped endpoint</param>
+        public void Register(string id, IPEndPoint endpoint)
+        {
+            if (id == null)
+            {
+                Exceptions.Throw(new ArgumentNullException("id"), _logger);
+            }
+            if (endpoint == null)
+            {
+                Exceptions.Throw(new ArgumentNullException("endpoint"), _logger);
+            }
+
+            _logger.Log(Level.Info, "Registering id: " + id + ", and endpoint: " + endpoint);
+            _idToAddrMap[id] = endpoint;
+        }
+
+        /// <summary>
+        /// Unregister the given identifier with the NameServer
+        /// </summary>
+        /// <param name="id">The identifier to unregister</param>
+        public void Unregister(string id)
+        {
+            if (id == null)
+            {
+                Exceptions.Throw(new ArgumentNullException("id"), _logger);
+            }
+
+            _logger.Log(Level.Info, "Unregistering id: " + id);
+            _idToAddrMap.Remove(id);
+        }
+
+        /// <summary>
+        /// Stops the NameServer
+        /// </summary>
+        public void Dispose()
+        {
+            _server.Dispose();
+        }
+
+        /// <summary>
+        /// Create the handler to manage incoming NamingEvent types
+        /// </summary>
+        /// <returns>The server handler</returns>
+        private IObserver<TransportEvent<NamingEvent>> CreateServerHandler()
+        {
+            PubSubSubject<NamingEvent> subject = new PubSubSubject<NamingEvent>();
+            subject.Subscribe(new NamingLookupRequestObserver(this));
+            subject.Subscribe(new NamingGetAllRequestObserver(this));
+            subject.Subscribe(new NamingRegisterRequestObserver(this));
+            subject.Subscribe(new NamingUnregisterRequestObserver(this));
+            return new ServerHandler(subject);
+        }
+
+        /// <summary>
+        /// Create the codec used to serialize/deserialize NamingEvent messages
+        /// </summary>
+        /// <returns>The serialization codec</returns>
+        private ICodec<NamingEvent> CreateServerCodec()
+        {
+            MultiCodec<NamingEvent> codec = new MultiCodec<NamingEvent>();
+            codec.Register(new NamingLookupRequestCodec(), "org.apache.reef.io.network.naming.serialization.NamingLookupRequest");
+            codec.Register(new NamingLookupResponseCodec(), "org.apache.reef.io.network.naming.serialization.NamingLookupResponse");
+            NamingRegisterRequestCodec requestCodec = new NamingRegisterRequestCodec();
+            codec.Register(requestCodec, "org.apache.reef.io.network.naming.serialization.NamingRegisterRequest");
+            codec.Register(new NamingRegisterResponseCodec(requestCodec), "org.apache.reef.io.network.naming.serialization.NamingRegisterResponse");
+            codec.Register(new NamingUnregisterRequestCodec(), "org.apache.reef.io.network.naming.serialization.NamingUnregisterRequest");
+            return codec;
+        }
+
+        [NamedParameter("Port for the NameServer to listen on")]
+        public class Port : Name<int>
+        {
+        }
+
+        /// <summary>
+        /// Class used to handle incoming NamingEvent messages.
+        /// Delegates the event to the prescribed handler depending on its type
+        /// </summary>
+        private class ServerHandler : AbstractObserver<TransportEvent<NamingEvent>>
+        {
+            private IObserver<NamingEvent> _handler; 
+
+            public ServerHandler(IObserver<NamingEvent> handler)
+            {
+                _handler = handler;
+            }
+
+            public override void OnNext(TransportEvent<NamingEvent> value)
+            {
+                NamingEvent message = value.Data;
+                message.Link = value.Link;
+                _handler.OnNext(message);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-io/Network/Naming/NamingConfiguration.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-io/Network/Naming/NamingConfiguration.cs b/lang/cs/Source/REEF/reef-io/Network/Naming/NamingConfiguration.cs
new file mode 100644
index 0000000..95ab878
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-io/Network/Naming/NamingConfiguration.cs
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Diagnostics.CodeAnalysis;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Org.Apache.Reef.Tang.Formats;
+using Org.Apache.Reef.Tang.Util;
+
+namespace Org.Apache.Reef.Naming
+{
+    public class NamingConfiguration : ConfigurationModuleBuilder
+    {
+        [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")]
+        public static readonly RequiredParameter<string> NameServerAddress = new RequiredParameter<string>();
+
+        [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")]
+        public static readonly RequiredParameter<int> NameServerPort = new RequiredParameter<int>();
+
+        public static ConfigurationModule ConfigurationModule
+        {
+            get
+            {
+                return new NamingConfiguration()
+                    .BindNamedParameter(GenericType<NamingConfigurationOptions.NameServerAddress>.Class, NameServerAddress)
+                    .BindNamedParameter(GenericType<NamingConfigurationOptions.NameServerPort>.Class, NameServerPort)
+                    .Build();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-io/Network/Naming/NamingConfigurationOptions.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-io/Network/Naming/NamingConfigurationOptions.cs b/lang/cs/Source/REEF/reef-io/Network/Naming/NamingConfigurationOptions.cs
new file mode 100644
index 0000000..e010f11
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-io/Network/Naming/NamingConfigurationOptions.cs
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Org.Apache.Reef.Tang.Annotations;
+
+namespace Org.Apache.Reef.Naming
+{
+    public class NamingConfigurationOptions
+    {
+        [NamedParameter("IP address of NameServer")]
+        public class NameServerAddress : Name<string>
+        {
+        }
+
+        [NamedParameter("Port of NameServer")]
+        public class NameServerPort : Name<int>
+        {
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-io/Network/Naming/Observers/NamingGetAllRequestObserver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-io/Network/Naming/Observers/NamingGetAllRequestObserver.cs b/lang/cs/Source/REEF/reef-io/Network/Naming/Observers/NamingGetAllRequestObserver.cs
new file mode 100644
index 0000000..e32ede7
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-io/Network/Naming/Observers/NamingGetAllRequestObserver.cs
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Collections.Generic;
+using Org.Apache.Reef.Common.io;
+using Org.Apache.Reef.IO.Network.Naming.Events;
+using Org.Apache.Reef.Wake.RX;
+
+namespace Org.Apache.Reef.IO.Network.Naming.Observers
+{
+    /// <summary>
+    /// Handler for NameService for events of type NamingGetAllRequest. 
+    /// Gets all of the identifiers and their mapped IPEndpoints registered 
+    /// with the NameServer.
+    /// </summary>
+    internal class NamingGetAllRequestObserver : AbstractObserver<NamingGetAllRequest>
+    {
+        private NameServer _server;
+
+        public NamingGetAllRequestObserver(NameServer server)
+        {
+            _server = server;
+        }
+
+        public override void OnNext(NamingGetAllRequest value)
+        {
+            List<NameAssignment> assignments = _server.GetAll();
+            value.Link.Write(new NamingGetAllResponse(assignments));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-io/Network/Naming/Observers/NamingLookupRequestObserver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-io/Network/Naming/Observers/NamingLookupRequestObserver.cs b/lang/cs/Source/REEF/reef-io/Network/Naming/Observers/NamingLookupRequestObserver.cs
new file mode 100644
index 0000000..07cabc0
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-io/Network/Naming/Observers/NamingLookupRequestObserver.cs
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Common.io;
+using Org.Apache.Reef.IO.Network.Naming.Events;
+using Org.Apache.Reef.Wake.RX;
+using System.Collections.Generic;
+
+namespace Org.Apache.Reef.IO.Network.Naming.Observers
+{
+    /// <summary>
+    /// Handler for looking up IPEndpoints registered with the NameServer
+    /// </summary>
+    internal class NamingLookupRequestObserver : AbstractObserver<NamingLookupRequest>
+    {
+        private NameServer _server;
+
+        public NamingLookupRequestObserver(NameServer server)
+        {
+            _server = server;
+        }
+
+        /// <summary>
+        /// Look up the IPEndpoints for the given identifiers and write them
+        /// back to the NameClient
+        /// </summary>
+        /// <param name="value">The lookup request event</param>
+        public override void OnNext(NamingLookupRequest value)
+        {
+            List<NameAssignment> assignments = _server.Lookup(value.Identifiers);
+            value.Link.Write(new NamingLookupResponse(assignments));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-io/Network/Naming/Observers/NamingRegisterRequestObserver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-io/Network/Naming/Observers/NamingRegisterRequestObserver.cs b/lang/cs/Source/REEF/reef-io/Network/Naming/Observers/NamingRegisterRequestObserver.cs
new file mode 100644
index 0000000..ccce0b3
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-io/Network/Naming/Observers/NamingRegisterRequestObserver.cs
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Common.io;
+using Org.Apache.Reef.IO.Network.Naming.Events;
+using Org.Apache.Reef.Wake.RX;
+
+namespace Org.Apache.Reef.IO.Network.Naming.Observers
+{
+    /// <summary>
+    /// Handler for registering an identifier and endpoint with the Name Service
+    /// </summary>
+    internal class NamingRegisterRequestObserver : AbstractObserver<NamingRegisterRequest>
+    {
+        private NameServer _server;
+
+        public NamingRegisterRequestObserver(NameServer server)
+        {
+            _server = server;
+        }
+
+        /// <summary>
+        /// Register the identifier and IPEndpoint with the NameServer and send 
+        /// the response back to the NameClient
+        /// </summary>
+        /// <param name="value">The register request event</param>
+        public override void OnNext(NamingRegisterRequest value)
+        {
+            NameAssignment assignment = value.NameAssignment;
+            _server.Register(assignment.Identifier, assignment.Endpoint);
+
+            value.Link.Write(new NamingRegisterResponse(value));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-io/Network/Naming/Observers/NamingUnregisterRequestObserver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-io/Network/Naming/Observers/NamingUnregisterRequestObserver.cs b/lang/cs/Source/REEF/reef-io/Network/Naming/Observers/NamingUnregisterRequestObserver.cs
new file mode 100644
index 0000000..c034269
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-io/Network/Naming/Observers/NamingUnregisterRequestObserver.cs
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.IO.Network.Naming.Events;
+using Org.Apache.Reef.Wake.RX;
+
+namespace Org.Apache.Reef.IO.Network.Naming.Observers
+{
+    /// <summary>
+    /// Handler for unregistering an identifier with the NameServer
+    /// </summary>
+    internal class NamingUnregisterRequestObserver : AbstractObserver<NamingUnregisterRequest>
+    {
+        private NameServer _server;
+
+        public NamingUnregisterRequestObserver(NameServer server)
+        {
+            _server = server;
+        }
+
+        /// <summary>
+        /// Unregister the identifer with the NameServer.  
+        /// </summary>
+        /// <param name="value">The unregister request event</param>
+        public override void OnNext(NamingUnregisterRequest value)
+        {
+            // Don't send a response
+            _server.Unregister(value.Identifier); 
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-io/Network/Network.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-io/Network/Network.csproj b/lang/cs/Source/REEF/reef-io/Network/Network.csproj
new file mode 100644
index 0000000..6ab99d8
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-io/Network/Network.csproj
@@ -0,0 +1,159 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<Project ToolsVersion="12.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+  <Import Project="$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props" Condition="Exists('$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props')" />
+  <PropertyGroup>
+    <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
+    <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
+    <ProjectGuid>{883CE800-6A6A-4E0A-B7FE-C054F4F2C1DC}</ProjectGuid>
+    <OutputType>Library</OutputType>
+    <AppDesignerFolder>Properties</AppDesignerFolder>
+    <RootNamespace>Org.Apache.Reef.IO.Network</RootNamespace>
+    <AssemblyName>Org.Apache.Reef.IO.Network</AssemblyName>
+    <TargetFrameworkVersion>v4.5</TargetFrameworkVersion>
+    <FileAlignment>512</FileAlignment>
+    <SolutionDir Condition="$(SolutionDir) == '' Or $(SolutionDir) == '*Undefined*'">..\..\..\</SolutionDir>
+    <RestorePackages>true</RestorePackages>
+  </PropertyGroup>
+  <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
+    <PlatformTarget>AnyCPU</PlatformTarget>
+    <DebugSymbols>true</DebugSymbols>
+    <DebugType>full</DebugType>
+    <Optimize>false</Optimize>
+    <OutputPath>..\..\..\..\bin\Debug\Org.Apache.Reef.IO.Network\</OutputPath>
+    <DefineConstants>DEBUG;TRACE</DefineConstants>
+    <ErrorReport>prompt</ErrorReport>
+    <WarningLevel>4</WarningLevel>
+  </PropertyGroup>
+  <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
+    <PlatformTarget>AnyCPU</PlatformTarget>
+    <DebugType>pdbonly</DebugType>
+    <Optimize>true</Optimize>
+    <OutputPath>..\..\..\..\bin\Release\Microsoft.Reef.IO.Network\</OutputPath>
+    <DefineConstants>TRACE</DefineConstants>
+    <ErrorReport>prompt</ErrorReport>
+    <WarningLevel>4</WarningLevel>
+  </PropertyGroup>
+  <ItemGroup>
+    <Reference Include="Microsoft.Hadoop.Avro">
+      <HintPath>..\..\..\..\packages\Microsoft.Hadoop.Avro.1.4.0.0\lib\net40\Microsoft.Hadoop.Avro.dll</HintPath>
+    </Reference>
+    <Reference Include="Newtonsoft.Json">
+      <HintPath>..\..\..\..\packages\Newtonsoft.Json.6.0.8\lib\net45\Newtonsoft.Json.dll</HintPath>
+    </Reference>
+    <Reference Include="protobuf-net">
+      <HintPath>..\..\..\..\packages\protobuf-net.2.0.0.668\lib\net40\protobuf-net.dll</HintPath>
+    </Reference>
+    <Reference Include="System" />
+    <Reference Include="System.Core" />
+    <Reference Include="System.Reactive.Core">
+      <HintPath>..\..\..\..\packages\Rx-Core.2.2.5\lib\net45\System.Reactive.Core.dll</HintPath>
+    </Reference>
+    <Reference Include="System.Reactive.Interfaces">
+      <HintPath>..\..\..\..\packages\Rx-Interfaces.2.2.5\lib\net45\System.Reactive.Interfaces.dll</HintPath>
+    </Reference>
+    <Reference Include="System.Runtime.Serialization" />
+    <Reference Include="System.Xml.Linq" />
+    <Reference Include="System.Data.DataSetExtensions" />
+    <Reference Include="Microsoft.CSharp" />
+    <Reference Include="System.Data" />
+    <Reference Include="System.Xml" />
+  </ItemGroup>
+  <ItemGroup>
+    <Compile Include="Naming\Codec\NamingLookupRequestCodec.cs" />
+    <Compile Include="Naming\Codec\NamingLookupResponseCodec.cs" />
+    <Compile Include="Naming\Codec\NamingRegisterRequestCodec.cs" />
+    <Compile Include="Naming\Codec\NamingRegisterResponseCodec.cs" />
+    <Compile Include="Naming\Codec\NamingUnregisterRequestCodec.cs" />
+    <Compile Include="Naming\Contracts\AvroNamingAssignment.cs" />
+    <Compile Include="Naming\Contracts\AvroNamingLookupRequest.cs" />
+    <Compile Include="Naming\Contracts\AvroNamingLookupResponse.cs" />
+    <Compile Include="Naming\Contracts\AvroNamingRegisterRequest.cs" />
+    <Compile Include="Naming\Contracts\AvroNamingUnRegisterRequest.cs" />
+    <Compile Include="Naming\Events\NamingEvent.cs" />
+    <Compile Include="Naming\Events\NamingGetAllRequest.cs" />
+    <Compile Include="Naming\Events\NamingGetAllResponse.cs" />
+    <Compile Include="Naming\Events\NamingLookupRequest.cs" />
+    <Compile Include="Naming\Events\NamingLookupResponse.cs" />
+    <Compile Include="Naming\Events\NamingRegisterRequest.cs" />
+    <Compile Include="Naming\Events\NamingRegisterResponse.cs" />
+    <Compile Include="Naming\Events\NamingUnregisterRequest.cs" />
+    <Compile Include="Naming\Events\NamingUnregisterResponse.cs" />
+    <Compile Include="Naming\INameServer.cs" />
+    <Compile Include="Naming\NameClient.cs" />
+    <Compile Include="Naming\NameLookupClient.cs" />
+    <Compile Include="Naming\NameRegisterClient.cs" />
+    <Compile Include="Naming\NameServer.cs" />
+    <Compile Include="Naming\NamingConfiguration.cs" />
+    <Compile Include="Naming\NamingConfigurationOptions.cs" />
+    <Compile Include="Naming\Observers\NamingGetAllRequestObserver.cs" />
+    <Compile Include="Naming\Observers\NamingLookupRequestObserver.cs" />
+    <Compile Include="Naming\Observers\NamingRegisterRequestObserver.cs" />
+    <Compile Include="Naming\Observers\NamingUnregisterRequestObserver.cs" />
+    <Compile Include="NetworkService\Codec\ControlMessageCodec.cs" />
+    <Compile Include="NetworkService\Codec\NsMessageCodec.cs" />
+    <Compile Include="NetworkService\Codec\NsMessageProto.cs" />
+    <Compile Include="NetworkService\ControlMessage.cs" />
+    <Compile Include="NetworkService\IConnection.cs" />
+    <Compile Include="NetworkService\INetworkService.cs" />
+    <Compile Include="NetworkService\NetworkService.cs" />
+    <Compile Include="NetworkService\NetworkServiceConfiguration.cs" />
+    <Compile Include="NetworkService\NetworkServiceOptions.cs" />
+    <Compile Include="NetworkService\NsConnection.cs" />
+    <Compile Include="NetworkService\NsMessage.cs" />
+    <Compile Include="Properties\AssemblyInfo.cs" />
+    <Compile Include="Utilities\BlockingCollectionExtensions.cs" />
+    <Compile Include="Utilities\Utils.cs" />
+  </ItemGroup>
+  <ItemGroup>
+    <None Include="packages.config" />
+  </ItemGroup>
+  <ItemGroup>
+    <ProjectReference Include="..\..\..\Tang\Tang\Tang.csproj">
+      <Project>{97dbb573-3994-417a-9f69-ffa25f00d2a6}</Project>
+      <Name>Tang</Name>
+    </ProjectReference>
+    <ProjectReference Include="..\..\..\Utilities\Utilities.csproj">
+      <Project>{79e7f89a-1dfb-45e1-8d43-d71a954aeb98}</Project>
+      <Name>Utilities</Name>
+    </ProjectReference>
+    <ProjectReference Include="..\..\..\WAKE\Wake\Wake.csproj">
+      <Project>{cdfb3464-4041-42b1-9271-83af24cd5008}</Project>
+      <Name>Wake</Name>
+    </ProjectReference>
+    <ProjectReference Include="..\..\reef-common\ReefCommon\ReefCommon.csproj">
+      <Project>{545a0582-4105-44ce-b99c-b1379514a630}</Project>
+      <Name>ReefCommon</Name>
+    </ProjectReference>
+    <ProjectReference Include="..\..\reef-common\ReefDriver\ReefDriver.csproj">
+      <Project>{a6baa2a7-f52f-4329-884e-1bcf711d6805}</Project>
+      <Name>ReefDriver</Name>
+    </ProjectReference>
+  </ItemGroup>
+  <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
+  <Import Project="$(SolutionDir)\.nuget\NuGet.targets" Condition="Exists('$(SolutionDir)\.nuget\NuGet.targets')" />
+  <!-- To modify your build process, add your task inside one of the targets below and uncomment it. 
+       Other similar extension points exist, see Microsoft.Common.targets.
+  <Target Name="BeforeBuild">
+  </Target>
+  <Target Name="AfterBuild">
+  </Target>
+  -->
+</Project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-io/Network/NetworkService/Codec/ControlMessageCodec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-io/Network/NetworkService/Codec/ControlMessageCodec.cs b/lang/cs/Source/REEF/reef-io/Network/NetworkService/Codec/ControlMessageCodec.cs
new file mode 100644
index 0000000..bfdb708
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-io/Network/NetworkService/Codec/ControlMessageCodec.cs
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using Org.Apache.Reef.Tang.Annotations;
+using Org.Apache.Reef.Wake.Remote;
+
+namespace Org.Apache.Reef.IO.Network.NetworkService.Codec
+{
+    public class ControlMessageCodec : ICodec<ControlMessage>
+    {
+        [Inject]
+        public ControlMessageCodec()
+        {
+        }
+
+        public byte[] Encode(ControlMessage message)
+        {
+            return BitConverter.GetBytes((int) message);
+        }
+
+        public ControlMessage Decode(byte[] data)
+        {
+            return (ControlMessage) BitConverter.ToInt32(data, 0);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-io/Network/NetworkService/Codec/NsMessageCodec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-io/Network/NetworkService/Codec/NsMessageCodec.cs b/lang/cs/Source/REEF/reef-io/Network/NetworkService/Codec/NsMessageCodec.cs
new file mode 100644
index 0000000..c2f3f0f
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-io/Network/NetworkService/Codec/NsMessageCodec.cs
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Wake;
+using Org.Apache.Reef.Wake.Remote;
+using ProtoBuf;
+using System;
+using System.IO;
+using System.Linq;
+
+namespace Org.Apache.Reef.IO.Network.NetworkService.Codec
+{
+    /// <summary>
+    /// Codec to serialize NsMessages for NetworkService.
+    /// </summary>
+    /// <typeparam name="T">The message type</typeparam>
+    public class NsMessageCodec<T> : ICodec<NsMessage<T>>
+    {
+        private ICodec<T> _codec;
+        private IIdentifierFactory _idFactory;
+
+        /// <summary>
+        /// Create new NsMessageCodec.
+        /// </summary>
+        /// <param name="codec">The codec used to serialize message data</param>
+        /// <param name="idFactory">Used to create identifier from string.</param>
+        public NsMessageCodec(ICodec<T> codec, IIdentifierFactory idFactory)
+        {
+            _codec = codec;
+            _idFactory = idFactory;
+        }
+
+        /// <summary>
+        /// Serialize the NsMessage.
+        /// </summary>
+        /// <param name="obj">The object to serialize</param>
+        /// <returns>The serialized object in byte array form</returns>
+        public byte[] Encode(NsMessage<T> obj)
+        {
+            NsMessageProto proto = NsMessageProto.Create(obj, _codec);
+            using (var stream = new MemoryStream())
+            {
+                Serializer.Serialize(stream, proto);
+                return stream.ToArray();
+            }
+        }
+
+        /// <summary>
+        /// Deserialize the byte array into NsMessage.
+        /// </summary>
+        /// <param name="data">The serialized byte array</param>
+        /// <returns>The deserialized NsMessage</returns>
+        public NsMessage<T> Decode(byte[] data)
+        {
+            using (var stream = new MemoryStream(data))
+            {
+                NsMessageProto proto = Serializer.Deserialize<NsMessageProto>(stream);
+
+                IIdentifier sourceId = _idFactory.Create(proto.SourceId);
+                IIdentifier destId = _idFactory.Create(proto.DestId);
+                NsMessage<T> message = new NsMessage<T>(sourceId, destId);
+
+                var messages = proto.Data.Select(byteArr => _codec.Decode(byteArr));
+                message.Data.AddRange(messages);
+                return message;
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-io/Network/NetworkService/Codec/NsMessageProto.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-io/Network/NetworkService/Codec/NsMessageProto.cs b/lang/cs/Source/REEF/reef-io/Network/NetworkService/Codec/NsMessageProto.cs
new file mode 100644
index 0000000..fd03026
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-io/Network/NetworkService/Codec/NsMessageProto.cs
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Runtime.Serialization;
+using System.Text;
+using System.Threading.Tasks;
+using Org.Apache.Reef.Wake.Remote;
+using ProtoBuf;
+
+namespace Org.Apache.Reef.IO.Network.NetworkService.Codec
+{
+    [ProtoContract]
+    public class NsMessageProto
+    {
+        public NsMessageProto()
+        {
+            Data = new List<byte[]>(); 
+        }
+
+        [ProtoMember(1)]
+        public string SourceId { get; set; }
+
+        [ProtoMember(2)]
+        public string DestId { get; set; }
+
+        [ProtoMember(3)]
+        public List<byte[]> Data { get; set; } 
+
+        public static NsMessageProto Create<T>(NsMessage<T> message, ICodec<T> codec)
+        {
+            NsMessageProto proto = new NsMessageProto();
+
+            proto.SourceId = message.SourceId.ToString();
+            proto.DestId = message.DestId.ToString();
+
+            foreach (T item in message.Data)
+            {
+                proto.Data.Add(codec.Encode(item));
+            }
+
+            return proto;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-io/Network/NetworkService/ControlMessage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-io/Network/NetworkService/ControlMessage.cs b/lang/cs/Source/REEF/reef-io/Network/NetworkService/ControlMessage.cs
new file mode 100644
index 0000000..bd05d56
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-io/Network/NetworkService/ControlMessage.cs
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.Reef.IO.Network.NetworkService
+{
+    public enum ControlMessage
+    {
+        /// <summary>
+        /// default state
+        /// </summary>
+        UNDEFINED = 0,
+
+        /// <summary>
+        /// expecting data to be sent/received
+        /// </summary>
+        RECEIVE = 1,
+
+        /// <summary>
+        /// stop group communications
+        /// </summary>
+        STOP = 2,
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-io/Network/NetworkService/IConnection.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-io/Network/NetworkService/IConnection.cs b/lang/cs/Source/REEF/reef-io/Network/NetworkService/IConnection.cs
new file mode 100644
index 0000000..8a09934
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-io/Network/NetworkService/IConnection.cs
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+
+namespace Org.Apache.Reef.IO.Network.NetworkService
+{
+    /// <summary>
+    /// Represents a connection between two endpoints named by identifiers
+    /// </summary>
+    public interface IConnection<T> : IDisposable
+    {
+        /// <summary>
+        /// Opens the connection
+        /// </summary>
+        void Open();
+
+        /// <summary>
+        /// Writes the object to the connection
+        /// </summary>
+        /// <param name="obj">The message to send</param>
+        void Write(T obj);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-io/Network/NetworkService/INetworkService.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-io/Network/NetworkService/INetworkService.cs b/lang/cs/Source/REEF/reef-io/Network/NetworkService/INetworkService.cs
new file mode 100644
index 0000000..bdd0ac9
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-io/Network/NetworkService/INetworkService.cs
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using Org.Apache.Reef.Common.io;
+using Org.Apache.Reef.Services;
+using Org.Apache.Reef.Tang.Annotations;
+using Org.Apache.Reef.Wake;
+
+namespace Org.Apache.Reef.IO.Network.NetworkService
+{
+    /// <summary>
+    /// Network service used for Reef Task communication.
+    /// </summary>
+    /// <typeparam name="T">The message type</typeparam>
+    public interface INetworkService<T> : IService, IDisposable
+    {
+        /// <summary>
+        /// Name client for registering ids
+        /// </summary>
+        INameClient NamingClient { get; }
+
+        /// <summary>
+        /// Open a new connection to the remote host registered to
+        /// the name service with the given identifier
+        /// </summary>
+        /// <param name="destinationId">The identifier of the remote host</param>
+        /// <returns>The IConnection used for communication</returns>
+        IConnection<T> NewConnection(IIdentifier destinationId);
+
+        /// <summary>
+        /// Register the identifier for the NetworkService with the NameService.
+        /// </summary>
+        /// <param name="id">The identifier to register</param>
+        void Register(IIdentifier id);
+
+        /// <summary>
+        /// Unregister the identifier for the NetworkService with the NameService.
+        /// </summary>
+        void Unregister();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-io/Network/NetworkService/NetworkService.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-io/Network/NetworkService/NetworkService.cs b/lang/cs/Source/REEF/reef-io/Network/NetworkService/NetworkService.cs
new file mode 100644
index 0000000..89e4151
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-io/Network/NetworkService/NetworkService.cs
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Net;
+using System.Reactive;
+using Org.Apache.Reef.Common.io;
+using Org.Apache.Reef.IO.Network.Naming;
+using Org.Apache.Reef.IO.Network.NetworkService.Codec;
+using Org.Apache.Reef.Utilities.Logging;
+using Org.Apache.Reef.Tang.Annotations;
+using Org.Apache.Reef.Tang.Exceptions;
+using Org.Apache.Reef.Wake;
+using Org.Apache.Reef.Wake.Remote;
+using Org.Apache.Reef.Wake.Remote.Impl;
+using Org.Apache.Reef.Wake.Util;
+
+namespace Org.Apache.Reef.IO.Network.NetworkService
+{
+    /// <summary>
+    /// Network service used for Reef Task communication.
+    /// </summary>
+    /// <typeparam name="T">The message type</typeparam>
+    public class NetworkService<T> : INetworkService<T>
+    {
+        private Logger LOGGER = Logger.GetLogger(typeof(NetworkService<>));
+
+        private IRemoteManager<NsMessage<T>> _remoteManager;
+        private IObserver<NsMessage<T>> _messageHandler; 
+        private ICodec<NsMessage<T>> _codec; 
+        private IIdentifier _localIdentifier;
+        private IDisposable _messageHandlerDisposable;
+        private Dictionary<IIdentifier, IConnection<T>> _connectionMap;  
+
+        /// <summary>
+        /// Create a new NetworkFactory.
+        /// </summary>
+        /// <param name="nsPort">The port that the NetworkService will listen on</param>
+        /// <param name="nameServerAddr">The address of the NameServer</param>
+        /// <param name="nameServerPort">The port of the NameServer</param>
+        /// <param name="messageHandler">The observer to handle incoming messages</param>
+        /// <param name="idFactory">The factory used to create IIdentifiers</param>
+        /// <param name="codec">The codec used for serialization</param>
+        [Inject]
+        public NetworkService(
+            [Parameter(typeof(NetworkServiceOptions.NetworkServicePort))] int nsPort,
+            [Parameter(typeof(NamingConfigurationOptions.NameServerAddress))] string nameServerAddr,
+            [Parameter(typeof(NamingConfigurationOptions.NameServerPort))] int nameServerPort,
+            IObserver<NsMessage<T>> messageHandler,
+            IIdentifierFactory idFactory,
+            ICodec<T> codec)
+        {
+            _codec = new NsMessageCodec<T>(codec, idFactory);
+
+            IPAddress localAddress = NetworkUtils.LocalIPAddress;
+            _remoteManager = new DefaultRemoteManager<NsMessage<T>>(localAddress, nsPort, _codec);
+            _messageHandler = messageHandler;
+
+            NamingClient = new NameClient(nameServerAddr, nameServerPort);
+            _connectionMap = new Dictionary<IIdentifier, IConnection<T>>();
+
+            LOGGER.Log(Level.Info, "Started network service");
+        }
+
+        /// <summary>
+        /// Name client for registering ids
+        /// </summary>
+        public INameClient NamingClient { get; private set; }
+
+        /// <summary>
+        /// Open a new connection to the remote host registered to
+        /// the name service with the given identifier
+        /// </summary>
+        /// <param name="destinationId">The identifier of the remote host</param>
+        /// <returns>The IConnection used for communication</returns>
+        public IConnection<T> NewConnection(IIdentifier destinationId)
+        {
+            if (_localIdentifier == null)
+            {
+                throw new IllegalStateException("Cannot open connection without first registering an ID");
+            }
+
+            IConnection<T> connection;
+            if (_connectionMap.TryGetValue(destinationId, out connection))
+            {
+                return connection;
+            }
+
+            connection = new NsConnection<T>(_localIdentifier, destinationId, 
+                NamingClient, _remoteManager, _connectionMap);
+
+            _connectionMap[destinationId] = connection;
+            return connection;
+        }
+
+        /// <summary>
+        /// Register the identifier for the NetworkService with the NameService.
+        /// </summary>
+        /// <param name="id">The identifier to register</param>
+        public void Register(IIdentifier id)
+        {
+            LOGGER.Log(Level.Info, "Registering id {0} with network service.", id);
+
+            _localIdentifier = id;
+            NamingClient.Register(id.ToString(), _remoteManager.LocalEndpoint);
+
+            // Create and register incoming message handler
+            var anyEndpoint = new IPEndPoint(IPAddress.Any, 0);
+            _messageHandlerDisposable = _remoteManager.RegisterObserver(anyEndpoint, _messageHandler);
+        }
+
+        /// <summary>
+        /// Unregister the identifier for the NetworkService with the NameService.
+        /// </summary>
+        public void Unregister()
+        {
+            if (_localIdentifier == null)
+            {
+                throw new IllegalStateException("Cannot unregister a non existant identifier");
+            }
+
+            NamingClient.Unregister(_localIdentifier.ToString());
+            _localIdentifier = null;
+            _messageHandlerDisposable.Dispose();
+        }
+
+        /// <summary>
+        /// Dispose of the NetworkService's resources
+        /// </summary>
+        public void Dispose()
+        {
+            NamingClient.Dispose();
+            _remoteManager.Dispose();
+
+            LOGGER.Log(Level.Info, "Disposed of network service");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-io/Network/NetworkService/NetworkServiceConfiguration.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-io/Network/NetworkService/NetworkServiceConfiguration.cs b/lang/cs/Source/REEF/reef-io/Network/NetworkService/NetworkServiceConfiguration.cs
new file mode 100644
index 0000000..54da9cb
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-io/Network/NetworkService/NetworkServiceConfiguration.cs
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Diagnostics.CodeAnalysis;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Org.Apache.Reef.Common.io;
+using Org.Apache.Reef.IO.Network.Naming;
+using Org.Apache.Reef.Tang.Formats;
+using Org.Apache.Reef.Tang.Util;
+using Org.Apache.Reef.Wake.Remote;
+
+namespace Org.Apache.Reef.IO.Network.NetworkService
+{
+    public class NetworkServiceConfiguration : ConfigurationModuleBuilder
+    {
+        [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] 
+        public static readonly RequiredParameter<int> NetworkServicePort = new RequiredParameter<int>();
+
+        [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] 
+        public static readonly RequiredImpl<ICodecFactory> NetworkServiceCodecFactory = new RequiredImpl<ICodecFactory>();
+
+        public static ConfigurationModule ConfigurationModule
+        {
+            get
+            {
+                return new NetworkServiceConfiguration()
+                    .BindNamedParameter(GenericType<NetworkServiceOptions.NetworkServicePort>.Class, NetworkServicePort)
+                    .BindNamedParameter(GenericType<NamingConfigurationOptions.NameServerPort>.Class,
+                                        NamingConfiguration.NameServerPort)
+                    .BindNamedParameter(GenericType<NamingConfigurationOptions.NameServerAddress>.Class,
+                                        NamingConfiguration.NameServerAddress)
+                    .BindImplementation(GenericType<ICodecFactory>.Class, NetworkServiceCodecFactory)
+                    .Build();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-io/Network/NetworkService/NetworkServiceOptions.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-io/Network/NetworkService/NetworkServiceOptions.cs b/lang/cs/Source/REEF/reef-io/Network/NetworkService/NetworkServiceOptions.cs
new file mode 100644
index 0000000..e41e590
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-io/Network/NetworkService/NetworkServiceOptions.cs
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Tang.Annotations;
+using Org.Apache.Reef.Wake;
+using Org.Apache.Reef.Wake.Remote;
+
+namespace Org.Apache.Reef.IO.Network.NetworkService
+{
+    public class NetworkServiceOptions
+    {
+        [NamedParameter("Port of NetworkService", "NsPort", "0")]
+        public class NetworkServicePort : Name<int>
+        {
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-io/Network/NetworkService/NsConnection.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-io/Network/NetworkService/NsConnection.cs b/lang/cs/Source/REEF/reef-io/Network/NetworkService/NsConnection.cs
new file mode 100644
index 0000000..1752e82
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-io/Network/NetworkService/NsConnection.cs
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Globalization;
+using System.IO;
+using System.Net;
+using System.Net.Sockets;
+using System.Runtime.Remoting;
+using Org.Apache.Reef.Common.io;
+using Org.Apache.Reef.Utilities.Logging;
+using Org.Apache.Reef.Tang.Exceptions;
+using Org.Apache.Reef.Wake;
+using Org.Apache.Reef.Wake.Remote;
+
+namespace Org.Apache.Reef.IO.Network.NetworkService
+{
+    /// <summary>
+    /// Represents a connection between two hosts using the NetworkService.
+    /// </summary>
+    public class NsConnection<T> : IConnection<T>
+    {
+        private static readonly Logger LOGGER = Logger.GetLogger(typeof(NsConnection<T>));
+
+        private IIdentifier _sourceId;
+        private IIdentifier _destId;
+        private INameClient _nameClient;
+        private IRemoteManager<NsMessage<T>> _remoteManager; 
+        private Dictionary<IIdentifier, IConnection<T>> _connectionMap;
+        private IObserver<NsMessage<T>> _remoteSender;
+
+        /// <summary>
+        /// Creates a new NsConnection between two hosts.
+        /// </summary>
+        /// <param name="sourceId">The identifier of the sender</param>
+        /// <param name="destId">The identifier of the receiver</param>
+        /// <param name="nameClient">The NameClient used for naming lookup</param>
+        /// <param name="remoteManager">The remote manager used for network communication</param>
+        /// <param name="connectionMap">A cache of opened connections.  Will remove itself from
+        /// the cache when the NsConnection is disposed.</param>
+        public NsConnection(
+            IIdentifier sourceId, 
+            IIdentifier destId, 
+            INameClient nameClient,
+            IRemoteManager<NsMessage<T>> remoteManager,
+            Dictionary<IIdentifier, IConnection<T>> connectionMap)
+        {
+            _sourceId = sourceId;
+            _destId = destId;
+            _nameClient = nameClient;
+            _remoteManager = remoteManager;
+            _connectionMap = connectionMap;
+        }
+
+        /// <summary>
+        /// Opens the connection to the remote host.
+        /// </summary>
+        public void Open()
+        {
+            string destStr = _destId.ToString();
+            LOGGER.Log(Level.Verbose, "Network service opening connection to {0}...", destStr);
+
+            IPEndPoint destAddr = _nameClient.Lookup(_destId.ToString());
+            if (destAddr == null)
+            {
+                throw new RemotingException("Cannot register Identifier with NameService");
+            }
+
+            try
+            {
+                _remoteSender = _remoteManager.GetRemoteObserver(destAddr);
+                LOGGER.Log(Level.Verbose, "Network service completed connection to {0}.", destStr);
+            }
+            catch (SocketException)
+            {
+                LOGGER.Log(Level.Error, "Network Service cannot open connection to " + destAddr);
+                throw;
+            }
+            catch (ObjectDisposedException)
+            {
+                LOGGER.Log(Level.Error, "Network Service cannot open connection to " + destAddr);
+                throw;
+            }
+        }
+
+        /// <summary>
+        /// Writes the object to the remote host.
+        /// </summary>
+        /// <param name="message">The message to send</param>
+        public void Write(T message)
+        {
+            if (_remoteSender == null)
+            {
+                throw new IllegalStateException("NsConnection has not been opened yet."); 
+            }
+
+            try
+            {
+                _remoteSender.OnNext(new NsMessage<T>(_sourceId, _destId, message));
+            }
+            catch (IOException)
+            {
+                LOGGER.Log(Level.Error, "Network Service cannot write message to {0}", _destId);
+                throw;
+            }
+            catch (ObjectDisposedException)
+            {
+                LOGGER.Log(Level.Error, "Network Service cannot write message to {0}", _destId);
+                throw;
+            }
+        }
+
+        /// <summary>
+        /// Closes the connection
+        /// </summary>
+        public void Dispose()
+        {
+            _connectionMap.Remove(_destId);
+        }
+    }
+}