You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/02/06 02:32:58 UTC
[2/5] incubator-reef git commit: [REEF-139] Changing .Net project
structure for Network and Evaluator
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Source/REEF/reef-io/Network/Naming/NameClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-io/Network/Naming/NameClient.cs b/lang/cs/Source/REEF/reef-io/Network/Naming/NameClient.cs
deleted file mode 100644
index 806f1d0..0000000
--- a/lang/cs/Source/REEF/reef-io/Network/Naming/NameClient.cs
+++ /dev/null
@@ -1,279 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using Org.Apache.REEF.Common.io;
-using Org.Apache.REEF.IO.Network.Naming.Codec;
-using Org.Apache.REEF.IO.Network.Naming.Events;
-using Org.Apache.REEF.Utilities.Diagnostics;
-using Org.Apache.REEF.Utilities.Logging;
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Wake;
-using Org.Apache.REEF.Wake.Remote;
-using Org.Apache.REEF.Wake.Remote.Impl;
-using Org.Apache.REEF.Wake.RX;
-using Org.Apache.REEF.Wake.RX.Impl;
-using System;
-using System.Collections.Concurrent;
-using System.Collections.Generic;
-using System.Linq;
-using System.Net;
-using System.Reactive;
-
-namespace Org.Apache.REEF.IO.Network.Naming
-{
- /// <summary>
- /// Client for the Reef name service.
- /// Used to register, unregister, and lookup IP Addresses of known hosts.
- /// </summary>
- public class NameClient : INameClient
- {
- private static Logger _logger = Logger.GetLogger(typeof(NameClient));
-
- private BlockingCollection<NamingLookupResponse> _lookupResponseQueue;
- private BlockingCollection<NamingGetAllResponse> _getAllResponseQueue;
- private BlockingCollection<NamingRegisterResponse> _registerResponseQueue;
- private BlockingCollection<NamingUnregisterResponse> _unregisterResponseQueue;
-
- private TransportClient<NamingEvent> _client;
-
- private NameLookupClient _lookupClient;
- private NameRegisterClient _registerClient;
-
- private bool _disposed;
-
- /// <summary>
- /// Constructs a NameClient to register, lookup, and unregister IPEndpoints
- /// with the NameServer.
- /// </summary>
- /// <param name="remoteAddress">The ip address of the NameServer</param>
- /// <param name="remotePort">The port of the NameServer</param>
- [Inject]
- public NameClient(
- [Parameter(typeof(NamingConfigurationOptions.NameServerAddress))] string remoteAddress,
- [Parameter(typeof(NamingConfigurationOptions.NameServerPort))] int remotePort)
- {
- IPEndPoint remoteEndpoint = new IPEndPoint(IPAddress.Parse(remoteAddress), remotePort);
- Initialize(remoteEndpoint);
- _disposed = false;
- }
-
- /// <summary>
- /// Constructs a NameClient to register, lookup, and unregister IPEndpoints
- /// with the NameServer.
- /// </summary>
- /// <param name="remoteEndpoint">The endpoint of the NameServer</param>
- public NameClient(IPEndPoint remoteEndpoint)
- {
- Initialize(remoteEndpoint);
- _disposed = false;
- }
-
- /// <summary>
- /// Synchronously registers the identifier with the NameService.
- /// Overwrites the previous mapping if the identifier has already
- /// been registered.
- /// </summary>
- /// <param name="id">The key used to map the remote endpoint</param>
- /// <param name="endpoint">The endpoint to map</param>
- public void Register(string id, IPEndPoint endpoint)
- {
- if (id == null)
- {
- Exceptions.Throw(new ArgumentNullException("id"), _logger);
- }
- if (endpoint == null)
- {
- Exceptions.Throw(new ArgumentNullException("endpoint"), _logger);
- }
-
- _logger.Log(Level.Info, "Registering id: " + id + ", and endpoint: " + endpoint);
- _registerClient.Register(id, endpoint);
- }
-
- /// <summary>
- /// Synchronously unregisters the remote identifier with the NameService
- /// </summary>
- /// <param name="id">The identifier to unregister</param>
- public void Unregister(string id)
- {
- if (id == null)
- {
- Exceptions.Throw(new ArgumentNullException("id"), _logger);
- }
-
- _logger.Log(Level.Info, "Unregistering id: " + id);
- _registerClient.Unregister(id);
- }
-
- /// <summary>
- /// Synchronously looks up the IPEndpoint for the registered identifier.
- /// </summary>
- /// <param name="id">The identifier to look up</param>
- /// <returns>The mapped IPEndpoint for the identifier, or null if
- /// the identifier has not been registered with the NameService</returns>
- public IPEndPoint Lookup(string id)
- {
- if (id == null)
- {
- Exceptions.Throw(new ArgumentNullException("id"), _logger);
- }
-
- List<NameAssignment> assignments = Lookup(new List<string> { id });
- if (assignments != null && assignments.Count > 0)
- {
- return assignments.First().Endpoint;
- }
-
- return null;
- }
-
- /// <summary>
- /// Synchronously looks up the IPEndpoint for each of the registered identifiers in the list.
- /// </summary>
- /// <param name="ids">The list of identifiers to look up</param>
- /// <returns>The list of NameAssignments representing a pair of identifer
- /// and mapped IPEndpoint for that identifier. If any of the requested identifiers
- /// are not registered with the NameService, their corresponding NameAssignment
- /// IPEndpoint value will be null.</returns>
- public List<NameAssignment> Lookup(List<string> ids)
- {
- if (ids == null || ids.Count == 0)
- {
- Exceptions.Throw(new ArgumentNullException("ids cannot be null or empty"), _logger);
- }
-
- _logger.Log(Level.Verbose, "Looking up ids");
- List<NameAssignment> assignments = _lookupClient.Lookup(ids);
- if (assignments != null)
- {
- return assignments;
- }
- Exceptions.Throw(new WakeRuntimeException("NameClient failed to look up ids."), _logger);
- return null; //above line will throw exception. So null will never be returned.
- }
-
- /// <summary>
- /// Restart the name client in case of failure.
- /// </summary>
- /// <param name="serverEndpoint">The new server endpoint to connect to</param>
- public void Restart(IPEndPoint serverEndpoint)
- {
- _client.Dispose();
- Initialize(serverEndpoint);
- }
-
- /// <summary>
- /// Releases resources used by NameClient
- /// </summary>
- public void Dispose()
- {
- Dispose(true);
- GC.SuppressFinalize(this);
- }
-
- protected virtual void Dispose(bool disposing)
- {
- if (_disposed)
- {
- return;
- }
- if (disposing)
- {
- _client.Dispose();
- }
- _disposed = true;
- }
-
- /// <summary>
- /// Create a new transport client connected to the NameServer at the given remote endpoint.
- /// </summary>
- /// <param name="serverEndpoint">The NameServer endpoint to connect to.</param>
- private void Initialize(IPEndPoint serverEndpoint)
- {
- _lookupResponseQueue = new BlockingCollection<NamingLookupResponse>();
- _getAllResponseQueue = new BlockingCollection<NamingGetAllResponse>();
- _registerResponseQueue = new BlockingCollection<NamingRegisterResponse>();
- _unregisterResponseQueue = new BlockingCollection<NamingUnregisterResponse>();
-
- IObserver<TransportEvent<NamingEvent>> clientHandler = CreateClientHandler();
- ICodec<NamingEvent> codec = CreateClientCodec();
- _client = new TransportClient<NamingEvent>(serverEndpoint, codec, clientHandler);
-
- _lookupClient = new NameLookupClient(_client, _lookupResponseQueue, _getAllResponseQueue);
- _registerClient = new NameRegisterClient(_client, _registerResponseQueue, _unregisterResponseQueue);
- }
-
- /// <summary>
- /// Create handler to handle async responses from the NameServer.
- /// </summary>
- /// <returns>The client handler to manage responses from the NameServer</returns>
- private IObserver<TransportEvent<NamingEvent>> CreateClientHandler()
- {
- PubSubSubject<NamingEvent> subject = new PubSubSubject<NamingEvent>();
- subject.Subscribe(Observer.Create<NamingLookupResponse>(msg => HandleResponse(_lookupResponseQueue, msg)));
- subject.Subscribe(Observer.Create<NamingGetAllResponse>(msg => HandleResponse(_getAllResponseQueue, msg)));
- subject.Subscribe(Observer.Create<NamingRegisterResponse>(msg => HandleResponse(_registerResponseQueue, msg)));
- subject.Subscribe(Observer.Create<NamingUnregisterResponse>(msg => HandleResponse(_unregisterResponseQueue, msg)));
- return new ClientObserver(subject);
- }
-
- /// <summary>
- /// Create the codec used to serialize/deserialize NamingEvent messages
- /// </summary>
- /// <returns>The serialization codec</returns>
- private ICodec<NamingEvent> CreateClientCodec()
- {
- MultiCodec<NamingEvent> codec = new MultiCodec<NamingEvent>();
- codec.Register(new NamingLookupRequestCodec(), "org.apache.reef.io.network.naming.serialization.NamingLookupRequest");
- codec.Register(new NamingLookupResponseCodec(), "org.apache.reef.io.network.naming.serialization.NamingLookupResponse");
- NamingRegisterRequestCodec requestCodec = new NamingRegisterRequestCodec();
- codec.Register(requestCodec, "org.apache.reef.io.network.naming.serialization.NamingRegisterRequest");
- codec.Register(new NamingRegisterResponseCodec(requestCodec), "org.apache.reef.io.network.naming.serialization.NamingRegisterResponse");
- codec.Register(new NamingUnregisterRequestCodec(), "org.apache.reef.io.network.naming.serialization.NamingUnregisterRequest");
- return codec;
- }
-
- private void HandleResponse<T>(BlockingCollection<T> queue, T message)
- {
- queue.Add(message);
- }
-
- /// <summary>
- /// Helper class used to handle response events from the NameServer.
- /// Delegates the event to the appropriate response queue depending on
- /// its event type.
- /// </summary>
- private class ClientObserver : AbstractObserver<TransportEvent<NamingEvent>>
- {
- private IObserver<NamingEvent> _handler;
-
- public ClientObserver(IObserver<NamingEvent> handler)
- {
- _handler = handler;
- }
-
- public override void OnNext(TransportEvent<NamingEvent> value)
- {
- NamingEvent message = value.Data;
- message.Link = value.Link;
- _handler.OnNext(message);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Source/REEF/reef-io/Network/Naming/NameLookupClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-io/Network/Naming/NameLookupClient.cs b/lang/cs/Source/REEF/reef-io/Network/Naming/NameLookupClient.cs
deleted file mode 100644
index 7499734..0000000
--- a/lang/cs/Source/REEF/reef-io/Network/Naming/NameLookupClient.cs
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System.Collections.Concurrent;
-using System.Collections.Generic;
-using System.Linq;
-using System.Net;
-using System.Threading;
-using Org.Apache.REEF.Common.io;
-using Org.Apache.REEF.IO.Network.Naming.Events;
-using Org.Apache.REEF.Wake.Remote.Impl;
-
-namespace Org.Apache.REEF.IO.Network.Naming
-{
- /// <summary>
- /// Helper class to send lookup events to the name server
- /// </summary>
- internal class NameLookupClient
- {
- private TransportClient<NamingEvent> _client;
- private BlockingCollection<NamingLookupResponse> _lookupResponseQueue;
- private BlockingCollection<NamingGetAllResponse> _getAllResponseQueue;
-
- /// <summary>
- /// Constructs a new NameLookupClient.
- /// </summary>
- /// <param name="client">The transport client used to connect to the NameServer</param>
- /// <param name="lookupQueue">The queue used to signal that a response
- /// has been received from the NameServer</param>
- /// <param name="getAllQueue">The queue used to signal that a GetAllResponse
- /// has been received from the NameServer</param>
- public NameLookupClient(TransportClient<NamingEvent> client,
- BlockingCollection<NamingLookupResponse> lookupQueue,
- BlockingCollection<NamingGetAllResponse> getAllQueue)
- {
- _client = client;
- _lookupResponseQueue = lookupQueue;
- _getAllResponseQueue = getAllQueue;
- }
-
- /// <summary>
- /// Look up the IPEndPoint that has been registered with the NameServer using
- /// the given identifier as the key.
- /// </summary>
- /// <param name="id">The id for the IPEndPoint</param>
- /// <param name="token">The cancellation token used for timeout</param>
- /// <returns>The registered IPEndpoint, or null if the identifer has not
- /// been registered with the NameServer or if the operation times out.</returns>
- public IPEndPoint Lookup(string id, CancellationToken token)
- {
- List<string> ids = new List<string> { id };
- List<NameAssignment> assignment = Lookup(ids);
- return (assignment == null || assignment.Count == 0) ? null : assignment.First().Endpoint;
- }
-
- /// <summary>
- /// Look up IPEndPoints that have been registered with the NameService
- /// </summary>
- /// <param name="ids">The list of ids to look up</param>
- /// <returns>A list of NameAssignments representing the mapped identifier/IPEndpoint
- /// pairs</returns>
- public List<NameAssignment> Lookup(List<string> ids)
- {
- _client.Send(new NamingLookupRequest(ids));
- NamingLookupResponse response = _lookupResponseQueue.Take();
- return response.NameAssignments;
- }
-
- /// <summary>
- /// Synchronously gets all of the identifier/IPEndpoint pairs registered with the NameService.
- /// </summary>
- /// <returns>A list of NameAssignments representing the mapped identifier/IPEndpoint
- /// pairs</returns>
- public List<NameAssignment> GetAll()
- {
- _client.Send(new NamingGetAllRequest());
- NamingGetAllResponse response = _getAllResponseQueue.Take();
- return response.Assignments;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Source/REEF/reef-io/Network/Naming/NameRegisterClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-io/Network/Naming/NameRegisterClient.cs b/lang/cs/Source/REEF/reef-io/Network/Naming/NameRegisterClient.cs
deleted file mode 100644
index 88dc3c0..0000000
--- a/lang/cs/Source/REEF/reef-io/Network/Naming/NameRegisterClient.cs
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System.Collections.Concurrent;
-using System.Net;
-using Org.Apache.REEF.Common.io;
-using Org.Apache.REEF.IO.Network.Naming.Events;
-using Org.Apache.REEF.Wake.Remote.Impl;
-
-namespace Org.Apache.REEF.IO.Network.Naming
-{
- /// <summary>
- /// Helper class to send register and unregister events to the NameServer.
- /// </summary>
- internal class NameRegisterClient
- {
- private TransportClient<NamingEvent> _client;
- private BlockingCollection<NamingRegisterResponse> _registerResponseQueue;
- private BlockingCollection<NamingUnregisterResponse> _unregisterResponseQueue;
-
- public NameRegisterClient(TransportClient<NamingEvent> client,
- BlockingCollection<NamingRegisterResponse> registerQueue,
- BlockingCollection<NamingUnregisterResponse> unregisterQueue)
- {
- _client = client;
- _registerResponseQueue = registerQueue;
- _unregisterResponseQueue = unregisterQueue;
- }
-
- /// <summary>
- /// Synchronously register the id and endpoint with the NameServer.
- /// </summary>
- /// <param name="id">The identifier</param>
- /// <param name="endpoint">The endpoint</param>
- public void Register(string id, IPEndPoint endpoint)
- {
- NameAssignment assignment = new NameAssignment(id, endpoint);
- _client.Send(new NamingRegisterRequest(assignment));
- _registerResponseQueue.Take();
- }
-
- /// <summary>
- /// Synchronously unregisters the identifier with the NameServer.
- /// </summary>
- /// <param name="id">The identifer to unregister</param>
- public void Unregister(string id)
- {
- _client.Send(new NamingUnregisterRequest(id));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Source/REEF/reef-io/Network/Naming/NameServer.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-io/Network/Naming/NameServer.cs b/lang/cs/Source/REEF/reef-io/Network/Naming/NameServer.cs
deleted file mode 100644
index 462e05a..0000000
--- a/lang/cs/Source/REEF/reef-io/Network/Naming/NameServer.cs
+++ /dev/null
@@ -1,196 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using Org.Apache.REEF.Common.io;
-using Org.Apache.REEF.IO.Network.Naming.Codec;
-using Org.Apache.REEF.IO.Network.Naming.Events;
-using Org.Apache.REEF.IO.Network.Naming.Observers;
-using Org.Apache.REEF.Utilities.Diagnostics;
-using Org.Apache.REEF.Utilities.Logging;
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Wake.Remote;
-using Org.Apache.REEF.Wake.Remote.Impl;
-using Org.Apache.REEF.Wake.RX;
-using Org.Apache.REEF.Wake.RX.Impl;
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Net;
-
-namespace Org.Apache.REEF.IO.Network.Naming
-{
- /// <summary>
- /// Service that manages names and IPEndpoints for well known hosts.
- /// Can register, unregister, and look up IPAddresses using a string identifier.
- /// </summary>
- public class NameServer : INameServer
- {
- private static Logger _logger = Logger.GetLogger(typeof(NameServer));
-
- private TransportServer<NamingEvent> _server;
- private Dictionary<string, IPEndPoint> _idToAddrMap;
-
- /// <summary>
- /// Create a new NameServer to run on the specified port.
- /// </summary>
- /// <param name="port">The port to listen for incoming connections on.</param>
- [Inject]
- public NameServer([Parameter(typeof(NamingConfigurationOptions.NameServerPort))] int port)
- {
- IObserver<TransportEvent<NamingEvent>> handler = CreateServerHandler();
- _idToAddrMap = new Dictionary<string, IPEndPoint>();
- ICodec<NamingEvent> codec = CreateServerCodec();
-
- // Start transport server, get listening IP endpoint
- _logger.Log(Level.Info, "Starting naming server");
- _server = new TransportServer<NamingEvent>(port, handler, codec);
- _server.Run();
- LocalEndpoint = _server.LocalEndpoint;
- }
-
- public IPEndPoint LocalEndpoint { get; private set; }
-
- /// <summary>
- /// Looks up the IPEndpoints for each string identifier
- /// </summary>
- /// <param name="ids">The IDs to look up</param>
- /// <returns>A list of Name assignments representing the identifier
- /// that was searched for and the mapped IPEndpoint</returns>
- public List<NameAssignment> Lookup(List<string> ids)
- {
- if (ids == null)
- {
- Exceptions.Throw(new ArgumentNullException("ids"), _logger);
- }
-
- return ids.Where(id => _idToAddrMap.ContainsKey(id))
- .Select(id => new NameAssignment(id, _idToAddrMap[id]))
- .ToList();
- }
-
- /// <summary>
- /// Gets all of the registered identifier/endpoint pairs.
- /// </summary>
- /// <returns>A list of all of the registered identifiers and their
- /// mapped IPEndpoints</returns>
- public List<NameAssignment> GetAll()
- {
- return _idToAddrMap.Select(pair => new NameAssignment(pair.Key, pair.Value)).ToList();
- }
-
- /// <summary>
- /// Registers the string identifier with the given IPEndpoint
- /// </summary>
- /// <param name="id">The string ident</param>
- /// <param name="endpoint">The mapped endpoint</param>
- public void Register(string id, IPEndPoint endpoint)
- {
- if (id == null)
- {
- Exceptions.Throw(new ArgumentNullException("id"), _logger);
- }
- if (endpoint == null)
- {
- Exceptions.Throw(new ArgumentNullException("endpoint"), _logger);
- }
-
- _logger.Log(Level.Info, "Registering id: " + id + ", and endpoint: " + endpoint);
- _idToAddrMap[id] = endpoint;
- }
-
- /// <summary>
- /// Unregister the given identifier with the NameServer
- /// </summary>
- /// <param name="id">The identifier to unregister</param>
- public void Unregister(string id)
- {
- if (id == null)
- {
- Exceptions.Throw(new ArgumentNullException("id"), _logger);
- }
-
- _logger.Log(Level.Info, "Unregistering id: " + id);
- _idToAddrMap.Remove(id);
- }
-
- /// <summary>
- /// Stops the NameServer
- /// </summary>
- public void Dispose()
- {
- _server.Dispose();
- }
-
- /// <summary>
- /// Create the handler to manage incoming NamingEvent types
- /// </summary>
- /// <returns>The server handler</returns>
- private IObserver<TransportEvent<NamingEvent>> CreateServerHandler()
- {
- PubSubSubject<NamingEvent> subject = new PubSubSubject<NamingEvent>();
- subject.Subscribe(new NamingLookupRequestObserver(this));
- subject.Subscribe(new NamingGetAllRequestObserver(this));
- subject.Subscribe(new NamingRegisterRequestObserver(this));
- subject.Subscribe(new NamingUnregisterRequestObserver(this));
- return new ServerHandler(subject);
- }
-
- /// <summary>
- /// Create the codec used to serialize/deserialize NamingEvent messages
- /// </summary>
- /// <returns>The serialization codec</returns>
- private ICodec<NamingEvent> CreateServerCodec()
- {
- MultiCodec<NamingEvent> codec = new MultiCodec<NamingEvent>();
- codec.Register(new NamingLookupRequestCodec(), "org.apache.reef.io.network.naming.serialization.NamingLookupRequest");
- codec.Register(new NamingLookupResponseCodec(), "org.apache.reef.io.network.naming.serialization.NamingLookupResponse");
- NamingRegisterRequestCodec requestCodec = new NamingRegisterRequestCodec();
- codec.Register(requestCodec, "org.apache.reef.io.network.naming.serialization.NamingRegisterRequest");
- codec.Register(new NamingRegisterResponseCodec(requestCodec), "org.apache.reef.io.network.naming.serialization.NamingRegisterResponse");
- codec.Register(new NamingUnregisterRequestCodec(), "org.apache.reef.io.network.naming.serialization.NamingUnregisterRequest");
- return codec;
- }
-
- [NamedParameter("Port for the NameServer to listen on")]
- public class Port : Name<int>
- {
- }
-
- /// <summary>
- /// Class used to handle incoming NamingEvent messages.
- /// Delegates the event to the prescribed handler depending on its type
- /// </summary>
- private class ServerHandler : AbstractObserver<TransportEvent<NamingEvent>>
- {
- private IObserver<NamingEvent> _handler;
-
- public ServerHandler(IObserver<NamingEvent> handler)
- {
- _handler = handler;
- }
-
- public override void OnNext(TransportEvent<NamingEvent> value)
- {
- NamingEvent message = value.Data;
- message.Link = value.Link;
- _handler.OnNext(message);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Source/REEF/reef-io/Network/Naming/NamingConfiguration.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-io/Network/Naming/NamingConfiguration.cs b/lang/cs/Source/REEF/reef-io/Network/Naming/NamingConfiguration.cs
deleted file mode 100644
index 3daac70..0000000
--- a/lang/cs/Source/REEF/reef-io/Network/Naming/NamingConfiguration.cs
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Collections.Generic;
-using System.Diagnostics.CodeAnalysis;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-using Org.Apache.REEF.Tang.Formats;
-using Org.Apache.REEF.Tang.Util;
-
-namespace Org.Apache.REEF.Naming
-{
- public class NamingConfiguration : ConfigurationModuleBuilder
- {
- [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")]
- public static readonly RequiredParameter<string> NameServerAddress = new RequiredParameter<string>();
-
- [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")]
- public static readonly RequiredParameter<int> NameServerPort = new RequiredParameter<int>();
-
- public static ConfigurationModule ConfigurationModule
- {
- get
- {
- return new NamingConfiguration()
- .BindNamedParameter(GenericType<NamingConfigurationOptions.NameServerAddress>.Class, NameServerAddress)
- .BindNamedParameter(GenericType<NamingConfigurationOptions.NameServerPort>.Class, NameServerPort)
- .Build();
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Source/REEF/reef-io/Network/Naming/NamingConfigurationOptions.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-io/Network/Naming/NamingConfigurationOptions.cs b/lang/cs/Source/REEF/reef-io/Network/Naming/NamingConfigurationOptions.cs
deleted file mode 100644
index aa9b6e6..0000000
--- a/lang/cs/Source/REEF/reef-io/Network/Naming/NamingConfigurationOptions.cs
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-using Org.Apache.REEF.Tang.Annotations;
-
-namespace Org.Apache.REEF.Naming
-{
- public class NamingConfigurationOptions
- {
- [NamedParameter("IP address of NameServer")]
- public class NameServerAddress : Name<string>
- {
- }
-
- [NamedParameter("Port of NameServer")]
- public class NameServerPort : Name<int>
- {
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Source/REEF/reef-io/Network/Naming/Observers/NamingGetAllRequestObserver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-io/Network/Naming/Observers/NamingGetAllRequestObserver.cs b/lang/cs/Source/REEF/reef-io/Network/Naming/Observers/NamingGetAllRequestObserver.cs
deleted file mode 100644
index 92cc158..0000000
--- a/lang/cs/Source/REEF/reef-io/Network/Naming/Observers/NamingGetAllRequestObserver.cs
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System.Collections.Generic;
-using Org.Apache.REEF.Common.io;
-using Org.Apache.REEF.IO.Network.Naming.Events;
-using Org.Apache.REEF.Wake.RX;
-
-namespace Org.Apache.REEF.IO.Network.Naming.Observers
-{
- /// <summary>
- /// Handler for NameService for events of type NamingGetAllRequest.
- /// Gets all of the identifiers and their mapped IPEndpoints registered
- /// with the NameServer.
- /// </summary>
- internal class NamingGetAllRequestObserver : AbstractObserver<NamingGetAllRequest>
- {
- private NameServer _server;
-
- public NamingGetAllRequestObserver(NameServer server)
- {
- _server = server;
- }
-
- public override void OnNext(NamingGetAllRequest value)
- {
- List<NameAssignment> assignments = _server.GetAll();
- value.Link.Write(new NamingGetAllResponse(assignments));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Source/REEF/reef-io/Network/Naming/Observers/NamingLookupRequestObserver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-io/Network/Naming/Observers/NamingLookupRequestObserver.cs b/lang/cs/Source/REEF/reef-io/Network/Naming/Observers/NamingLookupRequestObserver.cs
deleted file mode 100644
index 220aaa5..0000000
--- a/lang/cs/Source/REEF/reef-io/Network/Naming/Observers/NamingLookupRequestObserver.cs
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using Org.Apache.REEF.Common.io;
-using Org.Apache.REEF.IO.Network.Naming.Events;
-using Org.Apache.REEF.Wake.RX;
-using System.Collections.Generic;
-
-namespace Org.Apache.REEF.IO.Network.Naming.Observers
-{
- /// <summary>
- /// Handler for looking up IPEndpoints registered with the NameServer
- /// </summary>
- internal class NamingLookupRequestObserver : AbstractObserver<NamingLookupRequest>
- {
- private NameServer _server;
-
- public NamingLookupRequestObserver(NameServer server)
- {
- _server = server;
- }
-
- /// <summary>
- /// Look up the IPEndpoints for the given identifiers and write them
- /// back to the NameClient
- /// </summary>
- /// <param name="value">The lookup request event</param>
- public override void OnNext(NamingLookupRequest value)
- {
- List<NameAssignment> assignments = _server.Lookup(value.Identifiers);
- value.Link.Write(new NamingLookupResponse(assignments));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Source/REEF/reef-io/Network/Naming/Observers/NamingRegisterRequestObserver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-io/Network/Naming/Observers/NamingRegisterRequestObserver.cs b/lang/cs/Source/REEF/reef-io/Network/Naming/Observers/NamingRegisterRequestObserver.cs
deleted file mode 100644
index 2da45a1..0000000
--- a/lang/cs/Source/REEF/reef-io/Network/Naming/Observers/NamingRegisterRequestObserver.cs
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using Org.Apache.REEF.Common.io;
-using Org.Apache.REEF.IO.Network.Naming.Events;
-using Org.Apache.REEF.Wake.RX;
-
-namespace Org.Apache.REEF.IO.Network.Naming.Observers
-{
- /// <summary>
- /// Handler for registering an identifier and endpoint with the Name Service
- /// </summary>
- internal class NamingRegisterRequestObserver : AbstractObserver<NamingRegisterRequest>
- {
- private NameServer _server;
-
- public NamingRegisterRequestObserver(NameServer server)
- {
- _server = server;
- }
-
- /// <summary>
- /// Register the identifier and IPEndpoint with the NameServer and send
- /// the response back to the NameClient
- /// </summary>
- /// <param name="value">The register request event</param>
- public override void OnNext(NamingRegisterRequest value)
- {
- NameAssignment assignment = value.NameAssignment;
- _server.Register(assignment.Identifier, assignment.Endpoint);
-
- value.Link.Write(new NamingRegisterResponse(value));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Source/REEF/reef-io/Network/Naming/Observers/NamingUnregisterRequestObserver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-io/Network/Naming/Observers/NamingUnregisterRequestObserver.cs b/lang/cs/Source/REEF/reef-io/Network/Naming/Observers/NamingUnregisterRequestObserver.cs
deleted file mode 100644
index 17d2d50..0000000
--- a/lang/cs/Source/REEF/reef-io/Network/Naming/Observers/NamingUnregisterRequestObserver.cs
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using Org.Apache.REEF.IO.Network.Naming.Events;
-using Org.Apache.REEF.Wake.RX;
-
-namespace Org.Apache.REEF.IO.Network.Naming.Observers
-{
- /// <summary>
- /// Handler for unregistering an identifier with the NameServer
- /// </summary>
- internal class NamingUnregisterRequestObserver : AbstractObserver<NamingUnregisterRequest>
- {
- private NameServer _server;
-
- public NamingUnregisterRequestObserver(NameServer server)
- {
- _server = server;
- }
-
- /// <summary>
- /// Unregister the identifer with the NameServer.
- /// </summary>
- /// <param name="value">The unregister request event</param>
- public override void OnNext(NamingUnregisterRequest value)
- {
- // Don't send a response
- _server.Unregister(value.Identifier);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Source/REEF/reef-io/Network/Network.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-io/Network/Network.csproj b/lang/cs/Source/REEF/reef-io/Network/Network.csproj
deleted file mode 100644
index fd46373..0000000
--- a/lang/cs/Source/REEF/reef-io/Network/Network.csproj
+++ /dev/null
@@ -1,180 +0,0 @@
-<?xml version="1.0" encoding="utf-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-http://www.apache.org/licenses/LICENSE-2.0
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<Project ToolsVersion="12.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
- <Import Project="$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props" Condition="Exists('$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props')" />
- <PropertyGroup>
- <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
- <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
- <ProjectGuid>{883CE800-6A6A-4E0A-B7FE-C054F4F2C1DC}</ProjectGuid>
- <OutputType>Library</OutputType>
- <AppDesignerFolder>Properties</AppDesignerFolder>
- <RootNamespace>Org.Apache.Reef.IO.Network</RootNamespace>
- <AssemblyName>Org.Apache.Reef.IO.Network</AssemblyName>
- <TargetFrameworkVersion>v4.5</TargetFrameworkVersion>
- <FileAlignment>512</FileAlignment>
- <RestorePackages>true</RestorePackages>
- <SolutionDir Condition="$(SolutionDir) == '' Or $(SolutionDir) == '*Undefined*'">..\..\..\..</SolutionDir>
- </PropertyGroup>
- <Import Project="$(SolutionDir)\Source\build.props" />
- <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
- <PlatformTarget>AnyCPU</PlatformTarget>
- <DebugSymbols>true</DebugSymbols>
- <DebugType>full</DebugType>
- <Optimize>false</Optimize>
- <OutputPath>$(BinDir)\$(Platform)\$(Configuration)\$(RootNamespace)</OutputPath>
- <DefineConstants>DEBUG;TRACE</DefineConstants>
- <ErrorReport>prompt</ErrorReport>
- <WarningLevel>4</WarningLevel>
- </PropertyGroup>
- <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|x64' ">
- <PlatformTarget>AnyCPU</PlatformTarget>
- <DebugSymbols>true</DebugSymbols>
- <DebugType>full</DebugType>
- <Optimize>false</Optimize>
- <OutputPath>$(BinDir)\$(Platform)\$(Configuration)\$(RootNamespace)</OutputPath>
- <DefineConstants>DEBUG;TRACE</DefineConstants>
- <ErrorReport>prompt</ErrorReport>
- <WarningLevel>4</WarningLevel>
- </PropertyGroup>
- <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
- <PlatformTarget>AnyCPU</PlatformTarget>
- <DebugType>pdbonly</DebugType>
- <Optimize>true</Optimize>
- <OutputPath>$(BinDir)\$(Platform)\$(Configuration)\$(RootNamespace)</OutputPath>
- <DefineConstants>TRACE</DefineConstants>
- <ErrorReport>prompt</ErrorReport>
- <WarningLevel>4</WarningLevel>
- </PropertyGroup>
- <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|x64' ">
- <PlatformTarget>AnyCPU</PlatformTarget>
- <DebugType>pdbonly</DebugType>
- <Optimize>true</Optimize>
- <OutputPath>$(BinDir)\$(Platform)\$(Configuration)\$(RootNamespace)</OutputPath>
- <DefineConstants>TRACE</DefineConstants>
- <ErrorReport>prompt</ErrorReport>
- <WarningLevel>4</WarningLevel>
- </PropertyGroup>
- <ItemGroup>
- <Reference Include="Microsoft.Hadoop.Avro">
- <HintPath>$(PackagesDir)\Microsoft.Hadoop.Avro.$(AvroVersion)\lib\net40\Microsoft.Hadoop.Avro.dll</HintPath>
- </Reference>
- <Reference Include="Newtonsoft.Json">
- <HintPath>$(PackagesDir)\Newtonsoft.Json.$(NewtonsoftJsonVersion)\lib\net45\Newtonsoft.Json.dll</HintPath>
- </Reference>
- <Reference Include="protobuf-net">
- <HintPath>$(PackagesDir)\protobuf-net.$(ProtobufVersion)\lib\net40\protobuf-net.dll</HintPath>
- </Reference>
- <Reference Include="System.Reactive.Core">
- <HintPath>$(PackagesDir)\Rx-Core.$(RxVersion)\lib\net45\System.Reactive.Core.dll</HintPath>
- </Reference>
- <Reference Include="System.Reactive.Interfaces">
- <HintPath>$(PackagesDir)\Rx-Interfaces.$(RxVersion)\lib\net45\System.Reactive.Interfaces.dll</HintPath>
- </Reference>
- <Reference Include="System" />
- <Reference Include="System.Core" />
- <Reference Include="System.Runtime.Serialization" />
- <Reference Include="System.Xml.Linq" />
- <Reference Include="System.Data.DataSetExtensions" />
- <Reference Include="Microsoft.CSharp" />
- <Reference Include="System.Data" />
- <Reference Include="System.Xml" />
- </ItemGroup>
- <ItemGroup>
- <Compile Include="Naming\Codec\NamingLookupRequestCodec.cs" />
- <Compile Include="Naming\Codec\NamingLookupResponseCodec.cs" />
- <Compile Include="Naming\Codec\NamingRegisterRequestCodec.cs" />
- <Compile Include="Naming\Codec\NamingRegisterResponseCodec.cs" />
- <Compile Include="Naming\Codec\NamingUnregisterRequestCodec.cs" />
- <Compile Include="Naming\Contracts\AvroNamingAssignment.cs" />
- <Compile Include="Naming\Contracts\AvroNamingLookupRequest.cs" />
- <Compile Include="Naming\Contracts\AvroNamingLookupResponse.cs" />
- <Compile Include="Naming\Contracts\AvroNamingRegisterRequest.cs" />
- <Compile Include="Naming\Contracts\AvroNamingUnRegisterRequest.cs" />
- <Compile Include="Naming\Events\NamingEvent.cs" />
- <Compile Include="Naming\Events\NamingGetAllRequest.cs" />
- <Compile Include="Naming\Events\NamingGetAllResponse.cs" />
- <Compile Include="Naming\Events\NamingLookupRequest.cs" />
- <Compile Include="Naming\Events\NamingLookupResponse.cs" />
- <Compile Include="Naming\Events\NamingRegisterRequest.cs" />
- <Compile Include="Naming\Events\NamingRegisterResponse.cs" />
- <Compile Include="Naming\Events\NamingUnregisterRequest.cs" />
- <Compile Include="Naming\Events\NamingUnregisterResponse.cs" />
- <Compile Include="Naming\INameServer.cs" />
- <Compile Include="Naming\NameClient.cs" />
- <Compile Include="Naming\NameLookupClient.cs" />
- <Compile Include="Naming\NameRegisterClient.cs" />
- <Compile Include="Naming\NameServer.cs" />
- <Compile Include="Naming\NamingConfiguration.cs" />
- <Compile Include="Naming\NamingConfigurationOptions.cs" />
- <Compile Include="Naming\Observers\NamingGetAllRequestObserver.cs" />
- <Compile Include="Naming\Observers\NamingLookupRequestObserver.cs" />
- <Compile Include="Naming\Observers\NamingRegisterRequestObserver.cs" />
- <Compile Include="Naming\Observers\NamingUnregisterRequestObserver.cs" />
- <Compile Include="NetworkService\Codec\ControlMessageCodec.cs" />
- <Compile Include="NetworkService\Codec\NsMessageCodec.cs" />
- <Compile Include="NetworkService\Codec\NsMessageProto.cs" />
- <Compile Include="NetworkService\ControlMessage.cs" />
- <Compile Include="NetworkService\IConnection.cs" />
- <Compile Include="NetworkService\INetworkService.cs" />
- <Compile Include="NetworkService\NetworkService.cs" />
- <Compile Include="NetworkService\NetworkServiceConfiguration.cs" />
- <Compile Include="NetworkService\NetworkServiceOptions.cs" />
- <Compile Include="NetworkService\NsConnection.cs" />
- <Compile Include="NetworkService\NsMessage.cs" />
- <Compile Include="Properties\AssemblyInfo.cs" />
- <Compile Include="Utilities\BlockingCollectionExtensions.cs" />
- <Compile Include="Utilities\Utils.cs" />
- </ItemGroup>
- <ItemGroup>
- <None Include="packages.config" />
- </ItemGroup>
- <ItemGroup>
- <ProjectReference Include="$(SolutionDir)\Org.Apache.Reef.Tang\Org.Apache.Reef.Tang.csproj">
- <Project>{97dbb573-3994-417a-9f69-ffa25f00d2a6}</Project>
- <Name>Org.Apache.Reef.Tang</Name>
- </ProjectReference>
- <ProjectReference Include="$(SolutionDir)\Org.Apache.Reef.Utilities\Org.Apache.Reef.Utilities.csproj">
- <Project>{79e7f89a-1dfb-45e1-8d43-d71a954aeb98}</Project>
- <Name>Org.Apache.Reef.Utilities</Name>
- </ProjectReference>
- <ProjectReference Include="..\..\..\..\Org.Apache.Reef.Common\Org.Apache.Reef.Common.csproj">
- <Project>{545a0582-4105-44ce-b99c-b1379514a630}</Project>
- <Name>Org.Apache.Reef.Common</Name>
- </ProjectReference>
- <ProjectReference Include="..\..\..\..\Org.Apache.Reef.Driver\Org.Apache.Reef.Driver.csproj">
- <Project>{a6baa2a7-f52f-4329-884e-1bcf711d6805}</Project>
- <Name>Org.Apache.Reef.Driver</Name>
- </ProjectReference>
- <ProjectReference Include="..\..\..\..\Org.Apache.Reef.Wake\Org.Apache.Reef.Wake.csproj">
- <Project>{cdfb3464-4041-42b1-9271-83af24cd5008}</Project>
- <Name>Org.Apache.Reef.Wake</Name>
- </ProjectReference>
- </ItemGroup>
- <ItemGroup>
- <WCFMetadata Include="Service References\" />
- </ItemGroup>
- <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
- <Import Project="$(SolutionDir)\.nuget\NuGet.targets" Condition="Exists('$(SolutionDir)\.nuget\NuGet.targets')" />
- <!-- To modify your build process, add your task inside one of the targets below and uncomment it.
- Other similar extension points exist, see Microsoft.Common.targets.
- <Target Name="BeforeBuild">
- </Target>
- <Target Name="AfterBuild">
- </Target>
- -->
-</Project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Source/REEF/reef-io/Network/NetworkService/Codec/ControlMessageCodec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-io/Network/NetworkService/Codec/ControlMessageCodec.cs b/lang/cs/Source/REEF/reef-io/Network/NetworkService/Codec/ControlMessageCodec.cs
deleted file mode 100644
index 09ecfbe..0000000
--- a/lang/cs/Source/REEF/reef-io/Network/NetworkService/Codec/ControlMessageCodec.cs
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Wake.Remote;
-
-namespace Org.Apache.REEF.IO.Network.NetworkService.Codec
-{
- public class ControlMessageCodec : ICodec<ControlMessage>
- {
- [Inject]
- public ControlMessageCodec()
- {
- }
-
- public byte[] Encode(ControlMessage message)
- {
- return BitConverter.GetBytes((int) message);
- }
-
- public ControlMessage Decode(byte[] data)
- {
- return (ControlMessage) BitConverter.ToInt32(data, 0);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Source/REEF/reef-io/Network/NetworkService/Codec/NsMessageCodec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-io/Network/NetworkService/Codec/NsMessageCodec.cs b/lang/cs/Source/REEF/reef-io/Network/NetworkService/Codec/NsMessageCodec.cs
deleted file mode 100644
index 454182e..0000000
--- a/lang/cs/Source/REEF/reef-io/Network/NetworkService/Codec/NsMessageCodec.cs
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using Org.Apache.REEF.Wake;
-using Org.Apache.REEF.Wake.Remote;
-using ProtoBuf;
-using System;
-using System.IO;
-using System.Linq;
-
-namespace Org.Apache.REEF.IO.Network.NetworkService.Codec
-{
- /// <summary>
- /// Codec to serialize NsMessages for NetworkService.
- /// </summary>
- /// <typeparam name="T">The message type</typeparam>
- public class NsMessageCodec<T> : ICodec<NsMessage<T>>
- {
- private ICodec<T> _codec;
- private IIdentifierFactory _idFactory;
-
- /// <summary>
- /// Create new NsMessageCodec.
- /// </summary>
- /// <param name="codec">The codec used to serialize message data</param>
- /// <param name="idFactory">Used to create identifier from string.</param>
- public NsMessageCodec(ICodec<T> codec, IIdentifierFactory idFactory)
- {
- _codec = codec;
- _idFactory = idFactory;
- }
-
- /// <summary>
- /// Serialize the NsMessage.
- /// </summary>
- /// <param name="obj">The object to serialize</param>
- /// <returns>The serialized object in byte array form</returns>
- public byte[] Encode(NsMessage<T> obj)
- {
- NsMessageProto proto = NsMessageProto.Create(obj, _codec);
- using (var stream = new MemoryStream())
- {
- Serializer.Serialize(stream, proto);
- return stream.ToArray();
- }
- }
-
- /// <summary>
- /// Deserialize the byte array into NsMessage.
- /// </summary>
- /// <param name="data">The serialized byte array</param>
- /// <returns>The deserialized NsMessage</returns>
- public NsMessage<T> Decode(byte[] data)
- {
- using (var stream = new MemoryStream(data))
- {
- NsMessageProto proto = Serializer.Deserialize<NsMessageProto>(stream);
-
- IIdentifier sourceId = _idFactory.Create(proto.SourceId);
- IIdentifier destId = _idFactory.Create(proto.DestId);
- NsMessage<T> message = new NsMessage<T>(sourceId, destId);
-
- var messages = proto.Data.Select(byteArr => _codec.Decode(byteArr));
- message.Data.AddRange(messages);
- return message;
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Source/REEF/reef-io/Network/NetworkService/Codec/NsMessageProto.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-io/Network/NetworkService/Codec/NsMessageProto.cs b/lang/cs/Source/REEF/reef-io/Network/NetworkService/Codec/NsMessageProto.cs
deleted file mode 100644
index 8345e3a..0000000
--- a/lang/cs/Source/REEF/reef-io/Network/NetworkService/Codec/NsMessageProto.cs
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Runtime.Serialization;
-using System.Text;
-using System.Threading.Tasks;
-using Org.Apache.REEF.Wake.Remote;
-using ProtoBuf;
-
-namespace Org.Apache.REEF.IO.Network.NetworkService.Codec
-{
- [ProtoContract]
- public class NsMessageProto
- {
- public NsMessageProto()
- {
- Data = new List<byte[]>();
- }
-
- [ProtoMember(1)]
- public string SourceId { get; set; }
-
- [ProtoMember(2)]
- public string DestId { get; set; }
-
- [ProtoMember(3)]
- public List<byte[]> Data { get; set; }
-
- public static NsMessageProto Create<T>(NsMessage<T> message, ICodec<T> codec)
- {
- NsMessageProto proto = new NsMessageProto();
-
- proto.SourceId = message.SourceId.ToString();
- proto.DestId = message.DestId.ToString();
-
- foreach (T item in message.Data)
- {
- proto.Data.Add(codec.Encode(item));
- }
-
- return proto;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Source/REEF/reef-io/Network/NetworkService/ControlMessage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-io/Network/NetworkService/ControlMessage.cs b/lang/cs/Source/REEF/reef-io/Network/NetworkService/ControlMessage.cs
deleted file mode 100644
index 1309c32..0000000
--- a/lang/cs/Source/REEF/reef-io/Network/NetworkService/ControlMessage.cs
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-namespace Org.Apache.REEF.IO.Network.NetworkService
-{
- public enum ControlMessage
- {
- /// <summary>
- /// default state
- /// </summary>
- UNDEFINED = 0,
-
- /// <summary>
- /// expecting data to be sent/received
- /// </summary>
- RECEIVE = 1,
-
- /// <summary>
- /// stop group communications
- /// </summary>
- STOP = 2,
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Source/REEF/reef-io/Network/NetworkService/IConnection.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-io/Network/NetworkService/IConnection.cs b/lang/cs/Source/REEF/reef-io/Network/NetworkService/IConnection.cs
deleted file mode 100644
index 0a13d60..0000000
--- a/lang/cs/Source/REEF/reef-io/Network/NetworkService/IConnection.cs
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-
-namespace Org.Apache.REEF.IO.Network.NetworkService
-{
- /// <summary>
- /// Represents a connection between two endpoints named by identifiers
- /// </summary>
- public interface IConnection<T> : IDisposable
- {
- /// <summary>
- /// Opens the connection
- /// </summary>
- void Open();
-
- /// <summary>
- /// Writes the object to the connection
- /// </summary>
- /// <param name="obj">The message to send</param>
- void Write(T obj);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Source/REEF/reef-io/Network/NetworkService/INetworkService.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-io/Network/NetworkService/INetworkService.cs b/lang/cs/Source/REEF/reef-io/Network/NetworkService/INetworkService.cs
deleted file mode 100644
index f9e1a0b..0000000
--- a/lang/cs/Source/REEF/reef-io/Network/NetworkService/INetworkService.cs
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using Org.Apache.REEF.Common.io;
-using Org.Apache.REEF.Services;
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Wake;
-
-namespace Org.Apache.REEF.IO.Network.NetworkService
-{
- /// <summary>
- /// Network service used for Reef Task communication.
- /// </summary>
- /// <typeparam name="T">The message type</typeparam>
- public interface INetworkService<T> : IService, IDisposable
- {
- /// <summary>
- /// Name client for registering ids
- /// </summary>
- INameClient NamingClient { get; }
-
- /// <summary>
- /// Open a new connection to the remote host registered to
- /// the name service with the given identifier
- /// </summary>
- /// <param name="destinationId">The identifier of the remote host</param>
- /// <returns>The IConnection used for communication</returns>
- IConnection<T> NewConnection(IIdentifier destinationId);
-
- /// <summary>
- /// Register the identifier for the NetworkService with the NameService.
- /// </summary>
- /// <param name="id">The identifier to register</param>
- void Register(IIdentifier id);
-
- /// <summary>
- /// Unregister the identifier for the NetworkService with the NameService.
- /// </summary>
- void Unregister();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Source/REEF/reef-io/Network/NetworkService/NetworkService.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-io/Network/NetworkService/NetworkService.cs b/lang/cs/Source/REEF/reef-io/Network/NetworkService/NetworkService.cs
deleted file mode 100644
index 041cd8d..0000000
--- a/lang/cs/Source/REEF/reef-io/Network/NetworkService/NetworkService.cs
+++ /dev/null
@@ -1,156 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Net;
-using System.Reactive;
-using Org.Apache.REEF.Common.io;
-using Org.Apache.REEF.IO.Network.Naming;
-using Org.Apache.REEF.IO.Network.NetworkService.Codec;
-using Org.Apache.REEF.Utilities.Logging;
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Tang.Exceptions;
-using Org.Apache.REEF.Wake;
-using Org.Apache.REEF.Wake.Remote;
-using Org.Apache.REEF.Wake.Remote.Impl;
-using Org.Apache.REEF.Wake.Util;
-
-namespace Org.Apache.REEF.IO.Network.NetworkService
-{
- /// <summary>
- /// Network service used for Reef Task communication.
- /// </summary>
- /// <typeparam name="T">The message type</typeparam>
- public class NetworkService<T> : INetworkService<T>
- {
- private Logger LOGGER = Logger.GetLogger(typeof(NetworkService<>));
-
- private IRemoteManager<NsMessage<T>> _remoteManager;
- private IObserver<NsMessage<T>> _messageHandler;
- private ICodec<NsMessage<T>> _codec;
- private IIdentifier _localIdentifier;
- private IDisposable _messageHandlerDisposable;
- private Dictionary<IIdentifier, IConnection<T>> _connectionMap;
-
- /// <summary>
- /// Create a new NetworkFactory.
- /// </summary>
- /// <param name="nsPort">The port that the NetworkService will listen on</param>
- /// <param name="nameServerAddr">The address of the NameServer</param>
- /// <param name="nameServerPort">The port of the NameServer</param>
- /// <param name="messageHandler">The observer to handle incoming messages</param>
- /// <param name="idFactory">The factory used to create IIdentifiers</param>
- /// <param name="codec">The codec used for serialization</param>
- [Inject]
- public NetworkService(
- [Parameter(typeof(NetworkServiceOptions.NetworkServicePort))] int nsPort,
- [Parameter(typeof(NamingConfigurationOptions.NameServerAddress))] string nameServerAddr,
- [Parameter(typeof(NamingConfigurationOptions.NameServerPort))] int nameServerPort,
- IObserver<NsMessage<T>> messageHandler,
- IIdentifierFactory idFactory,
- ICodec<T> codec)
- {
- _codec = new NsMessageCodec<T>(codec, idFactory);
-
- IPAddress localAddress = NetworkUtils.LocalIPAddress;
- _remoteManager = new DefaultRemoteManager<NsMessage<T>>(localAddress, nsPort, _codec);
- _messageHandler = messageHandler;
-
- NamingClient = new NameClient(nameServerAddr, nameServerPort);
- _connectionMap = new Dictionary<IIdentifier, IConnection<T>>();
-
- LOGGER.Log(Level.Info, "Started network service");
- }
-
- /// <summary>
- /// Name client for registering ids
- /// </summary>
- public INameClient NamingClient { get; private set; }
-
- /// <summary>
- /// Open a new connection to the remote host registered to
- /// the name service with the given identifier
- /// </summary>
- /// <param name="destinationId">The identifier of the remote host</param>
- /// <returns>The IConnection used for communication</returns>
- public IConnection<T> NewConnection(IIdentifier destinationId)
- {
- if (_localIdentifier == null)
- {
- throw new IllegalStateException("Cannot open connection without first registering an ID");
- }
-
- IConnection<T> connection;
- if (_connectionMap.TryGetValue(destinationId, out connection))
- {
- return connection;
- }
-
- connection = new NsConnection<T>(_localIdentifier, destinationId,
- NamingClient, _remoteManager, _connectionMap);
-
- _connectionMap[destinationId] = connection;
- return connection;
- }
-
- /// <summary>
- /// Register the identifier for the NetworkService with the NameService.
- /// </summary>
- /// <param name="id">The identifier to register</param>
- public void Register(IIdentifier id)
- {
- LOGGER.Log(Level.Info, "Registering id {0} with network service.", id);
-
- _localIdentifier = id;
- NamingClient.Register(id.ToString(), _remoteManager.LocalEndpoint);
-
- // Create and register incoming message handler
- var anyEndpoint = new IPEndPoint(IPAddress.Any, 0);
- _messageHandlerDisposable = _remoteManager.RegisterObserver(anyEndpoint, _messageHandler);
- }
-
- /// <summary>
- /// Unregister the identifier for the NetworkService with the NameService.
- /// </summary>
- public void Unregister()
- {
- if (_localIdentifier == null)
- {
- throw new IllegalStateException("Cannot unregister a non existant identifier");
- }
-
- NamingClient.Unregister(_localIdentifier.ToString());
- _localIdentifier = null;
- _messageHandlerDisposable.Dispose();
- }
-
- /// <summary>
- /// Dispose of the NetworkService's resources
- /// </summary>
- public void Dispose()
- {
- NamingClient.Dispose();
- _remoteManager.Dispose();
-
- LOGGER.Log(Level.Info, "Disposed of network service");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Source/REEF/reef-io/Network/NetworkService/NetworkServiceConfiguration.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-io/Network/NetworkService/NetworkServiceConfiguration.cs b/lang/cs/Source/REEF/reef-io/Network/NetworkService/NetworkServiceConfiguration.cs
deleted file mode 100644
index 29c4097..0000000
--- a/lang/cs/Source/REEF/reef-io/Network/NetworkService/NetworkServiceConfiguration.cs
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Collections.Generic;
-using System.Diagnostics.CodeAnalysis;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-using Org.Apache.REEF.Common.io;
-using Org.Apache.REEF.IO.Network.Naming;
-using Org.Apache.REEF.Tang.Formats;
-using Org.Apache.REEF.Tang.Util;
-using Org.Apache.REEF.Wake.Remote;
-
-namespace Org.Apache.REEF.IO.Network.NetworkService
-{
- public class NetworkServiceConfiguration : ConfigurationModuleBuilder
- {
- [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")]
- public static readonly RequiredParameter<int> NetworkServicePort = new RequiredParameter<int>();
-
- [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")]
- public static readonly RequiredImpl<ICodecFactory> NetworkServiceCodecFactory = new RequiredImpl<ICodecFactory>();
-
- public static ConfigurationModule ConfigurationModule
- {
- get
- {
- return new NetworkServiceConfiguration()
- .BindNamedParameter(GenericType<NetworkServiceOptions.NetworkServicePort>.Class, NetworkServicePort)
- .BindNamedParameter(GenericType<NamingConfigurationOptions.NameServerPort>.Class,
- NamingConfiguration.NameServerPort)
- .BindNamedParameter(GenericType<NamingConfigurationOptions.NameServerAddress>.Class,
- NamingConfiguration.NameServerAddress)
- .BindImplementation(GenericType<ICodecFactory>.Class, NetworkServiceCodecFactory)
- .Build();
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Source/REEF/reef-io/Network/NetworkService/NetworkServiceOptions.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-io/Network/NetworkService/NetworkServiceOptions.cs b/lang/cs/Source/REEF/reef-io/Network/NetworkService/NetworkServiceOptions.cs
deleted file mode 100644
index dcf5bcc..0000000
--- a/lang/cs/Source/REEF/reef-io/Network/NetworkService/NetworkServiceOptions.cs
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Wake;
-using Org.Apache.REEF.Wake.Remote;
-
-namespace Org.Apache.REEF.IO.Network.NetworkService
-{
- public class NetworkServiceOptions
- {
- [NamedParameter("Port of NetworkService", "NsPort", "0")]
- public class NetworkServicePort : Name<int>
- {
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Source/REEF/reef-io/Network/NetworkService/NsConnection.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-io/Network/NetworkService/NsConnection.cs b/lang/cs/Source/REEF/reef-io/Network/NetworkService/NsConnection.cs
deleted file mode 100644
index 7e586d8..0000000
--- a/lang/cs/Source/REEF/reef-io/Network/NetworkService/NsConnection.cs
+++ /dev/null
@@ -1,139 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Collections.Concurrent;
-using System.Collections.Generic;
-using System.Globalization;
-using System.IO;
-using System.Net;
-using System.Net.Sockets;
-using System.Runtime.Remoting;
-using Org.Apache.REEF.Common.io;
-using Org.Apache.REEF.Utilities.Logging;
-using Org.Apache.REEF.Tang.Exceptions;
-using Org.Apache.REEF.Wake;
-using Org.Apache.REEF.Wake.Remote;
-
-namespace Org.Apache.REEF.IO.Network.NetworkService
-{
- /// <summary>
- /// Represents a connection between two hosts using the NetworkService.
- /// </summary>
- public class NsConnection<T> : IConnection<T>
- {
- private static readonly Logger LOGGER = Logger.GetLogger(typeof(NsConnection<T>));
-
- private IIdentifier _sourceId;
- private IIdentifier _destId;
- private INameClient _nameClient;
- private IRemoteManager<NsMessage<T>> _remoteManager;
- private Dictionary<IIdentifier, IConnection<T>> _connectionMap;
- private IObserver<NsMessage<T>> _remoteSender;
-
- /// <summary>
- /// Creates a new NsConnection between two hosts.
- /// </summary>
- /// <param name="sourceId">The identifier of the sender</param>
- /// <param name="destId">The identifier of the receiver</param>
- /// <param name="nameClient">The NameClient used for naming lookup</param>
- /// <param name="remoteManager">The remote manager used for network communication</param>
- /// <param name="connectionMap">A cache of opened connections. Will remove itself from
- /// the cache when the NsConnection is disposed.</param>
- public NsConnection(
- IIdentifier sourceId,
- IIdentifier destId,
- INameClient nameClient,
- IRemoteManager<NsMessage<T>> remoteManager,
- Dictionary<IIdentifier, IConnection<T>> connectionMap)
- {
- _sourceId = sourceId;
- _destId = destId;
- _nameClient = nameClient;
- _remoteManager = remoteManager;
- _connectionMap = connectionMap;
- }
-
- /// <summary>
- /// Opens the connection to the remote host.
- /// </summary>
- public void Open()
- {
- string destStr = _destId.ToString();
- LOGGER.Log(Level.Verbose, "Network service opening connection to {0}...", destStr);
-
- IPEndPoint destAddr = _nameClient.Lookup(_destId.ToString());
- if (destAddr == null)
- {
- throw new RemotingException("Cannot register Identifier with NameService");
- }
-
- try
- {
- _remoteSender = _remoteManager.GetRemoteObserver(destAddr);
- LOGGER.Log(Level.Verbose, "Network service completed connection to {0}.", destStr);
- }
- catch (SocketException)
- {
- LOGGER.Log(Level.Error, "Network Service cannot open connection to " + destAddr);
- throw;
- }
- catch (ObjectDisposedException)
- {
- LOGGER.Log(Level.Error, "Network Service cannot open connection to " + destAddr);
- throw;
- }
- }
-
- /// <summary>
- /// Writes the object to the remote host.
- /// </summary>
- /// <param name="message">The message to send</param>
- public void Write(T message)
- {
- if (_remoteSender == null)
- {
- throw new IllegalStateException("NsConnection has not been opened yet.");
- }
-
- try
- {
- _remoteSender.OnNext(new NsMessage<T>(_sourceId, _destId, message));
- }
- catch (IOException)
- {
- LOGGER.Log(Level.Error, "Network Service cannot write message to {0}", _destId);
- throw;
- }
- catch (ObjectDisposedException)
- {
- LOGGER.Log(Level.Error, "Network Service cannot write message to {0}", _destId);
- throw;
- }
- }
-
- /// <summary>
- /// Closes the connection
- /// </summary>
- public void Dispose()
- {
- _connectionMap.Remove(_destId);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Source/REEF/reef-io/Network/NetworkService/NsMessage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-io/Network/NetworkService/NsMessage.cs b/lang/cs/Source/REEF/reef-io/Network/NetworkService/NsMessage.cs
deleted file mode 100644
index 3c99ffd..0000000
--- a/lang/cs/Source/REEF/reef-io/Network/NetworkService/NsMessage.cs
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System.Collections.Generic;
-using Org.Apache.REEF.Wake;
-
-namespace Org.Apache.REEF.IO.Network.NetworkService
-{
- /// <summary>
- /// Message sent between NetworkServices
- /// </summary>
- /// <typeparam name="T">The type of data being sent</typeparam>
- public class NsMessage<T>
- {
- /// <summary>
- /// Create a new NsMessage with no data.
- /// </summary>
- /// <param name="sourceId">The identifier of the sender</param>
- /// <param name="destId">The identifier of the receiver</param>
- public NsMessage(IIdentifier sourceId, IIdentifier destId)
- {
- SourceId = sourceId;
- DestId = destId;
- Data = new List<T>();
- }
-
- /// <summary>
- /// Create a new NsMessage with data.
- /// </summary>
- /// <param name="sourceId">The identifier of the sender</param>
- /// <param name="destId">The identifier of the receiver</param>
- /// <param name="message">The message to send</param>
- public NsMessage(IIdentifier sourceId, IIdentifier destId, T message)
- {
- SourceId = sourceId;
- DestId = destId;
- Data = new List<T> { message };
- }
-
- /// <summary>
- /// The identifier of the sender of the message.
- /// </summary>
- public IIdentifier SourceId { get; private set; }
-
- /// <summary>
- /// The identifier of the receiver of the message.
- /// </summary>
- public IIdentifier DestId { get; private set; }
-
- /// <summary>
- /// A list of data being sent in the message.
- /// </summary>
- public List<T> Data { get; private set; }
- }
-}