You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/02/06 02:33:00 UTC
[4/5] incubator-reef git commit: [REEF-139] Changing .Net project
structure for Network and Evaluator
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Org.Apache.REEF.Network/Naming/NameLookupClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Naming/NameLookupClient.cs b/lang/cs/Org.Apache.REEF.Network/Naming/NameLookupClient.cs
new file mode 100644
index 0000000..b8c4018
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Naming/NameLookupClient.cs
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Linq;
+using System.Net;
+using System.Threading;
+using Org.Apache.REEF.Common.io;
+using Org.Apache.REEF.Network.Naming.Events;
+using Org.Apache.REEF.Wake.Remote.Impl;
+
+namespace Org.Apache.REEF.Network.Naming
+{
+ /// <summary>
+ /// Helper class to send lookup events to the name server
+ /// </summary>
+ internal class NameLookupClient
+ {
+ private TransportClient<NamingEvent> _client;
+ private BlockingCollection<NamingLookupResponse> _lookupResponseQueue;
+ private BlockingCollection<NamingGetAllResponse> _getAllResponseQueue;
+
+ /// <summary>
+ /// Constructs a new NameLookupClient.
+ /// </summary>
+ /// <param name="client">The transport client used to connect to the NameServer</param>
+ /// <param name="lookupQueue">The queue used to signal that a response
+ /// has been received from the NameServer</param>
+ /// <param name="getAllQueue">The queue used to signal that a GetAllResponse
+ /// has been received from the NameServer</param>
+ public NameLookupClient(TransportClient<NamingEvent> client,
+ BlockingCollection<NamingLookupResponse> lookupQueue,
+ BlockingCollection<NamingGetAllResponse> getAllQueue)
+ {
+ _client = client;
+ _lookupResponseQueue = lookupQueue;
+ _getAllResponseQueue = getAllQueue;
+ }
+
+ /// <summary>
+ /// Look up the IPEndPoint that has been registered with the NameServer using
+ /// the given identifier as the key.
+ /// </summary>
+ /// <param name="id">The id for the IPEndPoint</param>
+ /// <param name="token">The cancellation token used for timeout</param>
+ /// <returns>The registered IPEndpoint, or null if the identifer has not
+ /// been registered with the NameServer or if the operation times out.</returns>
+ public IPEndPoint Lookup(string id, CancellationToken token)
+ {
+ List<string> ids = new List<string> { id };
+ List<NameAssignment> assignment = Lookup(ids);
+ return (assignment == null || assignment.Count == 0) ? null : assignment.First().Endpoint;
+ }
+
+ /// <summary>
+ /// Look up IPEndPoints that have been registered with the NameService
+ /// </summary>
+ /// <param name="ids">The list of ids to look up</param>
+ /// <returns>A list of NameAssignments representing the mapped identifier/IPEndpoint
+ /// pairs</returns>
+ public List<NameAssignment> Lookup(List<string> ids)
+ {
+ _client.Send(new NamingLookupRequest(ids));
+ NamingLookupResponse response = _lookupResponseQueue.Take();
+ return response.NameAssignments;
+ }
+
+ /// <summary>
+ /// Synchronously gets all of the identifier/IPEndpoint pairs registered with the NameService.
+ /// </summary>
+ /// <returns>A list of NameAssignments representing the mapped identifier/IPEndpoint
+ /// pairs</returns>
+ public List<NameAssignment> GetAll()
+ {
+ _client.Send(new NamingGetAllRequest());
+ NamingGetAllResponse response = _getAllResponseQueue.Take();
+ return response.Assignments;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Org.Apache.REEF.Network/Naming/NameRegisterClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Naming/NameRegisterClient.cs b/lang/cs/Org.Apache.REEF.Network/Naming/NameRegisterClient.cs
new file mode 100644
index 0000000..a18cb31
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Naming/NameRegisterClient.cs
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Collections.Concurrent;
+using System.Net;
+using Org.Apache.REEF.Common.io;
+using Org.Apache.REEF.Network.Naming.Events;
+using Org.Apache.REEF.Wake.Remote.Impl;
+
+namespace Org.Apache.REEF.Network.Naming
+{
+ /// <summary>
+ /// Helper class to send register and unregister events to the NameServer.
+ /// </summary>
+ internal class NameRegisterClient
+ {
+ private TransportClient<NamingEvent> _client;
+ private BlockingCollection<NamingRegisterResponse> _registerResponseQueue;
+ private BlockingCollection<NamingUnregisterResponse> _unregisterResponseQueue;
+
+ public NameRegisterClient(TransportClient<NamingEvent> client,
+ BlockingCollection<NamingRegisterResponse> registerQueue,
+ BlockingCollection<NamingUnregisterResponse> unregisterQueue)
+ {
+ _client = client;
+ _registerResponseQueue = registerQueue;
+ _unregisterResponseQueue = unregisterQueue;
+ }
+
+ /// <summary>
+ /// Synchronously register the id and endpoint with the NameServer.
+ /// </summary>
+ /// <param name="id">The identifier</param>
+ /// <param name="endpoint">The endpoint</param>
+ public void Register(string id, IPEndPoint endpoint)
+ {
+ NameAssignment assignment = new NameAssignment(id, endpoint);
+ _client.Send(new NamingRegisterRequest(assignment));
+ _registerResponseQueue.Take();
+ }
+
+ /// <summary>
+ /// Synchronously unregisters the identifier with the NameServer.
+ /// </summary>
+ /// <param name="id">The identifer to unregister</param>
+ public void Unregister(string id)
+ {
+ _client.Send(new NamingUnregisterRequest(id));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Org.Apache.REEF.Network/Naming/NameServer.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Naming/NameServer.cs b/lang/cs/Org.Apache.REEF.Network/Naming/NameServer.cs
new file mode 100644
index 0000000..26207e0
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Naming/NameServer.cs
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Common.io;
+using Org.Apache.REEF.Network.Naming.Codec;
+using Org.Apache.REEF.Network.Naming.Events;
+using Org.Apache.REEF.Network.Naming.Observers;
+using Org.Apache.REEF.Utilities.Diagnostics;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Wake.Remote.Impl;
+using Org.Apache.REEF.Wake.RX;
+using Org.Apache.REEF.Wake.RX.Impl;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Net;
+
+namespace Org.Apache.REEF.Network.Naming
+{
+ /// <summary>
+ /// Service that manages names and IPEndpoints for well known hosts.
+ /// Can register, unregister, and look up IPAddresses using a string identifier.
+ /// </summary>
+ public class NameServer : INameServer
+ {
+ private static Logger _logger = Logger.GetLogger(typeof(NameServer));
+
+ private TransportServer<NamingEvent> _server;
+ private Dictionary<string, IPEndPoint> _idToAddrMap;
+
+ /// <summary>
+ /// Create a new NameServer to run on the specified port.
+ /// </summary>
+ /// <param name="port">The port to listen for incoming connections on.</param>
+ [Inject]
+ public NameServer([Parameter(typeof(NamingConfigurationOptions.NameServerPort))] int port)
+ {
+ IObserver<TransportEvent<NamingEvent>> handler = CreateServerHandler();
+ _idToAddrMap = new Dictionary<string, IPEndPoint>();
+ ICodec<NamingEvent> codec = CreateServerCodec();
+
+ // Start transport server, get listening IP endpoint
+ _logger.Log(Level.Info, "Starting naming server");
+ _server = new TransportServer<NamingEvent>(port, handler, codec);
+ _server.Run();
+ LocalEndpoint = _server.LocalEndpoint;
+ }
+
+ public IPEndPoint LocalEndpoint { get; private set; }
+
+ /// <summary>
+ /// Looks up the IPEndpoints for each string identifier
+ /// </summary>
+ /// <param name="ids">The IDs to look up</param>
+ /// <returns>A list of Name assignments representing the identifier
+ /// that was searched for and the mapped IPEndpoint</returns>
+ public List<NameAssignment> Lookup(List<string> ids)
+ {
+ if (ids == null)
+ {
+ Exceptions.Throw(new ArgumentNullException("ids"), _logger);
+ }
+
+ return ids.Where(id => _idToAddrMap.ContainsKey(id))
+ .Select(id => new NameAssignment(id, _idToAddrMap[id]))
+ .ToList();
+ }
+
+ /// <summary>
+ /// Gets all of the registered identifier/endpoint pairs.
+ /// </summary>
+ /// <returns>A list of all of the registered identifiers and their
+ /// mapped IPEndpoints</returns>
+ public List<NameAssignment> GetAll()
+ {
+ return _idToAddrMap.Select(pair => new NameAssignment(pair.Key, pair.Value)).ToList();
+ }
+
+ /// <summary>
+ /// Registers the string identifier with the given IPEndpoint
+ /// </summary>
+ /// <param name="id">The string ident</param>
+ /// <param name="endpoint">The mapped endpoint</param>
+ public void Register(string id, IPEndPoint endpoint)
+ {
+ if (id == null)
+ {
+ Exceptions.Throw(new ArgumentNullException("id"), _logger);
+ }
+ if (endpoint == null)
+ {
+ Exceptions.Throw(new ArgumentNullException("endpoint"), _logger);
+ }
+
+ _logger.Log(Level.Info, "Registering id: " + id + ", and endpoint: " + endpoint);
+ _idToAddrMap[id] = endpoint;
+ }
+
+ /// <summary>
+ /// Unregister the given identifier with the NameServer
+ /// </summary>
+ /// <param name="id">The identifier to unregister</param>
+ public void Unregister(string id)
+ {
+ if (id == null)
+ {
+ Exceptions.Throw(new ArgumentNullException("id"), _logger);
+ }
+
+ _logger.Log(Level.Info, "Unregistering id: " + id);
+ _idToAddrMap.Remove(id);
+ }
+
+ /// <summary>
+ /// Stops the NameServer
+ /// </summary>
+ public void Dispose()
+ {
+ _server.Dispose();
+ }
+
+ /// <summary>
+ /// Create the handler to manage incoming NamingEvent types
+ /// </summary>
+ /// <returns>The server handler</returns>
+ private IObserver<TransportEvent<NamingEvent>> CreateServerHandler()
+ {
+ PubSubSubject<NamingEvent> subject = new PubSubSubject<NamingEvent>();
+ subject.Subscribe(new NamingLookupRequestObserver(this));
+ subject.Subscribe(new NamingGetAllRequestObserver(this));
+ subject.Subscribe(new NamingRegisterRequestObserver(this));
+ subject.Subscribe(new NamingUnregisterRequestObserver(this));
+ return new ServerHandler(subject);
+ }
+
+ /// <summary>
+ /// Create the codec used to serialize/deserialize NamingEvent messages
+ /// </summary>
+ /// <returns>The serialization codec</returns>
+ private ICodec<NamingEvent> CreateServerCodec()
+ {
+ MultiCodec<NamingEvent> codec = new MultiCodec<NamingEvent>();
+ codec.Register(new NamingLookupRequestCodec(), "org.apache.reef.io.network.naming.serialization.NamingLookupRequest");
+ codec.Register(new NamingLookupResponseCodec(), "org.apache.reef.io.network.naming.serialization.NamingLookupResponse");
+ NamingRegisterRequestCodec requestCodec = new NamingRegisterRequestCodec();
+ codec.Register(requestCodec, "org.apache.reef.io.network.naming.serialization.NamingRegisterRequest");
+ codec.Register(new NamingRegisterResponseCodec(requestCodec), "org.apache.reef.io.network.naming.serialization.NamingRegisterResponse");
+ codec.Register(new NamingUnregisterRequestCodec(), "org.apache.reef.io.network.naming.serialization.NamingUnregisterRequest");
+ return codec;
+ }
+
+ [NamedParameter("Port for the NameServer to listen on")]
+ public class Port : Name<int>
+ {
+ }
+
+ /// <summary>
+ /// Class used to handle incoming NamingEvent messages.
+ /// Delegates the event to the prescribed handler depending on its type
+ /// </summary>
+ private class ServerHandler : AbstractObserver<TransportEvent<NamingEvent>>
+ {
+ private IObserver<NamingEvent> _handler;
+
+ public ServerHandler(IObserver<NamingEvent> handler)
+ {
+ _handler = handler;
+ }
+
+ public override void OnNext(TransportEvent<NamingEvent> value)
+ {
+ NamingEvent message = value.Data;
+ message.Link = value.Link;
+ _handler.OnNext(message);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Org.Apache.REEF.Network/Naming/NamingConfiguration.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Naming/NamingConfiguration.cs b/lang/cs/Org.Apache.REEF.Network/Naming/NamingConfiguration.cs
new file mode 100644
index 0000000..3daac70
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Naming/NamingConfiguration.cs
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Diagnostics.CodeAnalysis;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Tang.Formats;
+using Org.Apache.REEF.Tang.Util;
+
+namespace Org.Apache.REEF.Naming
+{
+ public class NamingConfiguration : ConfigurationModuleBuilder
+ {
+ [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")]
+ public static readonly RequiredParameter<string> NameServerAddress = new RequiredParameter<string>();
+
+ [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")]
+ public static readonly RequiredParameter<int> NameServerPort = new RequiredParameter<int>();
+
+ public static ConfigurationModule ConfigurationModule
+ {
+ get
+ {
+ return new NamingConfiguration()
+ .BindNamedParameter(GenericType<NamingConfigurationOptions.NameServerAddress>.Class, NameServerAddress)
+ .BindNamedParameter(GenericType<NamingConfigurationOptions.NameServerPort>.Class, NameServerPort)
+ .Build();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Org.Apache.REEF.Network/Naming/NamingConfigurationOptions.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Naming/NamingConfigurationOptions.cs b/lang/cs/Org.Apache.REEF.Network/Naming/NamingConfigurationOptions.cs
new file mode 100644
index 0000000..aa9b6e6
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Naming/NamingConfigurationOptions.cs
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.Naming
+{
+ public class NamingConfigurationOptions
+ {
+ [NamedParameter("IP address of NameServer")]
+ public class NameServerAddress : Name<string>
+ {
+ }
+
+ [NamedParameter("Port of NameServer")]
+ public class NameServerPort : Name<int>
+ {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Org.Apache.REEF.Network/Naming/Observers/NamingGetAllRequestObserver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Naming/Observers/NamingGetAllRequestObserver.cs b/lang/cs/Org.Apache.REEF.Network/Naming/Observers/NamingGetAllRequestObserver.cs
new file mode 100644
index 0000000..df3a4a9
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Naming/Observers/NamingGetAllRequestObserver.cs
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Collections.Generic;
+using Org.Apache.REEF.Common.io;
+using Org.Apache.REEF.Network.Naming.Events;
+using Org.Apache.REEF.Wake.RX;
+
+namespace Org.Apache.REEF.Network.Naming.Observers
+{
+ /// <summary>
+ /// Handler for NameService for events of type NamingGetAllRequest.
+ /// Gets all of the identifiers and their mapped IPEndpoints registered
+ /// with the NameServer.
+ /// </summary>
+ internal class NamingGetAllRequestObserver : AbstractObserver<NamingGetAllRequest>
+ {
+ private NameServer _server;
+
+ public NamingGetAllRequestObserver(NameServer server)
+ {
+ _server = server;
+ }
+
+ public override void OnNext(NamingGetAllRequest value)
+ {
+ List<NameAssignment> assignments = _server.GetAll();
+ value.Link.Write(new NamingGetAllResponse(assignments));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Org.Apache.REEF.Network/Naming/Observers/NamingLookupRequestObserver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Naming/Observers/NamingLookupRequestObserver.cs b/lang/cs/Org.Apache.REEF.Network/Naming/Observers/NamingLookupRequestObserver.cs
new file mode 100644
index 0000000..21c602d
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Naming/Observers/NamingLookupRequestObserver.cs
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Common.io;
+using Org.Apache.REEF.Network.Naming.Events;
+using Org.Apache.REEF.Wake.RX;
+using System.Collections.Generic;
+
+namespace Org.Apache.REEF.Network.Naming.Observers
+{
+ /// <summary>
+ /// Handler for looking up IPEndpoints registered with the NameServer
+ /// </summary>
+ internal class NamingLookupRequestObserver : AbstractObserver<NamingLookupRequest>
+ {
+ private NameServer _server;
+
+ public NamingLookupRequestObserver(NameServer server)
+ {
+ _server = server;
+ }
+
+ /// <summary>
+ /// Look up the IPEndpoints for the given identifiers and write them
+ /// back to the NameClient
+ /// </summary>
+ /// <param name="value">The lookup request event</param>
+ public override void OnNext(NamingLookupRequest value)
+ {
+ List<NameAssignment> assignments = _server.Lookup(value.Identifiers);
+ value.Link.Write(new NamingLookupResponse(assignments));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Org.Apache.REEF.Network/Naming/Observers/NamingRegisterRequestObserver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Naming/Observers/NamingRegisterRequestObserver.cs b/lang/cs/Org.Apache.REEF.Network/Naming/Observers/NamingRegisterRequestObserver.cs
new file mode 100644
index 0000000..8ab8f6c
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Naming/Observers/NamingRegisterRequestObserver.cs
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Common.io;
+using Org.Apache.REEF.Network.Naming.Events;
+using Org.Apache.REEF.Wake.RX;
+
+namespace Org.Apache.REEF.Network.Naming.Observers
+{
+ /// <summary>
+ /// Handler for registering an identifier and endpoint with the Name Service
+ /// </summary>
+ internal class NamingRegisterRequestObserver : AbstractObserver<NamingRegisterRequest>
+ {
+ private NameServer _server;
+
+ public NamingRegisterRequestObserver(NameServer server)
+ {
+ _server = server;
+ }
+
+ /// <summary>
+ /// Register the identifier and IPEndpoint with the NameServer and send
+ /// the response back to the NameClient
+ /// </summary>
+ /// <param name="value">The register request event</param>
+ public override void OnNext(NamingRegisterRequest value)
+ {
+ NameAssignment assignment = value.NameAssignment;
+ _server.Register(assignment.Identifier, assignment.Endpoint);
+
+ value.Link.Write(new NamingRegisterResponse(value));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Org.Apache.REEF.Network/Naming/Observers/NamingUnregisterRequestObserver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Naming/Observers/NamingUnregisterRequestObserver.cs b/lang/cs/Org.Apache.REEF.Network/Naming/Observers/NamingUnregisterRequestObserver.cs
new file mode 100644
index 0000000..6127a4d
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Naming/Observers/NamingUnregisterRequestObserver.cs
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Network.Naming.Events;
+using Org.Apache.REEF.Wake.RX;
+
+namespace Org.Apache.REEF.Network.Naming.Observers
+{
+ /// <summary>
+ /// Handler for unregistering an identifier with the NameServer
+ /// </summary>
+ internal class NamingUnregisterRequestObserver : AbstractObserver<NamingUnregisterRequest>
+ {
+ private NameServer _server;
+
+ public NamingUnregisterRequestObserver(NameServer server)
+ {
+ _server = server;
+ }
+
+ /// <summary>
+ /// Unregister the identifer with the NameServer.
+ /// </summary>
+ /// <param name="value">The unregister request event</param>
+ public override void OnNext(NamingUnregisterRequest value)
+ {
+ // Don't send a response
+ _server.Unregister(value.Identifier);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Org.Apache.REEF.Network/NetworkService/Codec/ControlMessageCodec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/NetworkService/Codec/ControlMessageCodec.cs b/lang/cs/Org.Apache.REEF.Network/NetworkService/Codec/ControlMessageCodec.cs
new file mode 100644
index 0000000..471e651
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/NetworkService/Codec/ControlMessageCodec.cs
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Wake.Remote;
+
+namespace Org.Apache.REEF.Network.NetworkService.Codec
+{
+ public class ControlMessageCodec : ICodec<ControlMessage>
+ {
+ [Inject]
+ public ControlMessageCodec()
+ {
+ }
+
+ public byte[] Encode(ControlMessage message)
+ {
+ return BitConverter.GetBytes((int) message);
+ }
+
+ public ControlMessage Decode(byte[] data)
+ {
+ return (ControlMessage) BitConverter.ToInt32(data, 0);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Org.Apache.REEF.Network/NetworkService/Codec/NsMessageCodec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/NetworkService/Codec/NsMessageCodec.cs b/lang/cs/Org.Apache.REEF.Network/NetworkService/Codec/NsMessageCodec.cs
new file mode 100644
index 0000000..d01a7ae
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/NetworkService/Codec/NsMessageCodec.cs
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Wake;
+using Org.Apache.REEF.Wake.Remote;
+using ProtoBuf;
+using System;
+using System.IO;
+using System.Linq;
+
+namespace Org.Apache.REEF.Network.NetworkService.Codec
+{
+ /// <summary>
+ /// Codec to serialize NsMessages for NetworkService.
+ /// </summary>
+ /// <typeparam name="T">The message type</typeparam>
+ public class NsMessageCodec<T> : ICodec<NsMessage<T>>
+ {
+ private ICodec<T> _codec;
+ private IIdentifierFactory _idFactory;
+
+ /// <summary>
+ /// Create new NsMessageCodec.
+ /// </summary>
+ /// <param name="codec">The codec used to serialize message data</param>
+ /// <param name="idFactory">Used to create identifier from string.</param>
+ public NsMessageCodec(ICodec<T> codec, IIdentifierFactory idFactory)
+ {
+ _codec = codec;
+ _idFactory = idFactory;
+ }
+
+ /// <summary>
+ /// Serialize the NsMessage.
+ /// </summary>
+ /// <param name="obj">The object to serialize</param>
+ /// <returns>The serialized object in byte array form</returns>
+ public byte[] Encode(NsMessage<T> obj)
+ {
+ NsMessageProto proto = NsMessageProto.Create(obj, _codec);
+ using (var stream = new MemoryStream())
+ {
+ Serializer.Serialize(stream, proto);
+ return stream.ToArray();
+ }
+ }
+
+ /// <summary>
+ /// Deserialize the byte array into NsMessage.
+ /// </summary>
+ /// <param name="data">The serialized byte array</param>
+ /// <returns>The deserialized NsMessage</returns>
+ public NsMessage<T> Decode(byte[] data)
+ {
+ using (var stream = new MemoryStream(data))
+ {
+ NsMessageProto proto = Serializer.Deserialize<NsMessageProto>(stream);
+
+ IIdentifier sourceId = _idFactory.Create(proto.SourceId);
+ IIdentifier destId = _idFactory.Create(proto.DestId);
+ NsMessage<T> message = new NsMessage<T>(sourceId, destId);
+
+ var messages = proto.Data.Select(byteArr => _codec.Decode(byteArr));
+ message.Data.AddRange(messages);
+ return message;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Org.Apache.REEF.Network/NetworkService/Codec/NsMessageProto.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/NetworkService/Codec/NsMessageProto.cs b/lang/cs/Org.Apache.REEF.Network/NetworkService/Codec/NsMessageProto.cs
new file mode 100644
index 0000000..0de8be1
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/NetworkService/Codec/NsMessageProto.cs
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Runtime.Serialization;
+using System.Text;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Wake.Remote;
+using ProtoBuf;
+
+namespace Org.Apache.REEF.Network.NetworkService.Codec
+{
+ [ProtoContract]
+ public class NsMessageProto
+ {
+ public NsMessageProto()
+ {
+ Data = new List<byte[]>();
+ }
+
+ [ProtoMember(1)]
+ public string SourceId { get; set; }
+
+ [ProtoMember(2)]
+ public string DestId { get; set; }
+
+ [ProtoMember(3)]
+ public List<byte[]> Data { get; set; }
+
+ public static NsMessageProto Create<T>(NsMessage<T> message, ICodec<T> codec)
+ {
+ NsMessageProto proto = new NsMessageProto();
+
+ proto.SourceId = message.SourceId.ToString();
+ proto.DestId = message.DestId.ToString();
+
+ foreach (T item in message.Data)
+ {
+ proto.Data.Add(codec.Encode(item));
+ }
+
+ return proto;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Org.Apache.REEF.Network/NetworkService/ControlMessage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/NetworkService/ControlMessage.cs b/lang/cs/Org.Apache.REEF.Network/NetworkService/ControlMessage.cs
new file mode 100644
index 0000000..bcc7a8c
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/NetworkService/ControlMessage.cs
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.REEF.Network.NetworkService
+{
+ public enum ControlMessage
+ {
+ /// <summary>
+ /// default state
+ /// </summary>
+ UNDEFINED = 0,
+
+ /// <summary>
+ /// expecting data to be sent/received
+ /// </summary>
+ RECEIVE = 1,
+
+ /// <summary>
+ /// stop group communications
+ /// </summary>
+ STOP = 2,
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Org.Apache.REEF.Network/NetworkService/IConnection.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/NetworkService/IConnection.cs b/lang/cs/Org.Apache.REEF.Network/NetworkService/IConnection.cs
new file mode 100644
index 0000000..78b9c37
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/NetworkService/IConnection.cs
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+
+namespace Org.Apache.REEF.Network.NetworkService
+{
+ /// <summary>
+ /// Represents a connection between two endpoints named by identifiers
+ /// </summary>
+ public interface IConnection<T> : IDisposable
+ {
+ /// <summary>
+ /// Opens the connection
+ /// </summary>
+ void Open();
+
+ /// <summary>
+ /// Writes the object to the connection
+ /// </summary>
+ /// <param name="obj">The message to send</param>
+ void Write(T obj);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Org.Apache.REEF.Network/NetworkService/INetworkService.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/NetworkService/INetworkService.cs b/lang/cs/Org.Apache.REEF.Network/NetworkService/INetworkService.cs
new file mode 100644
index 0000000..a4b845a
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/NetworkService/INetworkService.cs
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using Org.Apache.REEF.Common.io;
+using Org.Apache.REEF.Services;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Wake;
+
+namespace Org.Apache.REEF.Network.NetworkService
+{
+ /// <summary>
+ /// Network service used for Reef Task communication.
+ /// </summary>
+ /// <typeparam name="T">The message type</typeparam>
+ public interface INetworkService<T> : IService, IDisposable
+ {
+ /// <summary>
+ /// Name client for registering ids
+ /// </summary>
+ INameClient NamingClient { get; }
+
+ /// <summary>
+ /// Open a new connection to the remote host registered to
+ /// the name service with the given identifier
+ /// </summary>
+ /// <param name="destinationId">The identifier of the remote host</param>
+ /// <returns>The IConnection used for communication</returns>
+ IConnection<T> NewConnection(IIdentifier destinationId);
+
+ /// <summary>
+ /// Register the identifier for the NetworkService with the NameService.
+ /// </summary>
+ /// <param name="id">The identifier to register</param>
+ void Register(IIdentifier id);
+
+ /// <summary>
+ /// Unregister the identifier for the NetworkService with the NameService.
+ /// </summary>
+ void Unregister();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Org.Apache.REEF.Network/NetworkService/NetworkService.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/NetworkService/NetworkService.cs b/lang/cs/Org.Apache.REEF.Network/NetworkService/NetworkService.cs
new file mode 100644
index 0000000..57eae81
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/NetworkService/NetworkService.cs
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Net;
+using System.Reactive;
+using Org.Apache.REEF.Common.io;
+using Org.Apache.REEF.Network.Naming;
+using Org.Apache.REEF.Network.NetworkService.Codec;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Exceptions;
+using Org.Apache.REEF.Wake;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Wake.Remote.Impl;
+using Org.Apache.REEF.Wake.Util;
+
+namespace Org.Apache.REEF.Network.NetworkService
+{
+ /// <summary>
+ /// Network service used for Reef Task communication.
+ /// </summary>
+ /// <typeparam name="T">The message type</typeparam>
+ public class NetworkService<T> : INetworkService<T>
+ {
+ private Logger LOGGER = Logger.GetLogger(typeof(NetworkService<>));
+
+ private IRemoteManager<NsMessage<T>> _remoteManager;
+ private IObserver<NsMessage<T>> _messageHandler;
+ private ICodec<NsMessage<T>> _codec;
+ private IIdentifier _localIdentifier;
+ private IDisposable _messageHandlerDisposable;
+ private Dictionary<IIdentifier, IConnection<T>> _connectionMap;
+
+ /// <summary>
+ /// Create a new NetworkFactory.
+ /// </summary>
+ /// <param name="nsPort">The port that the NetworkService will listen on</param>
+ /// <param name="nameServerAddr">The address of the NameServer</param>
+ /// <param name="nameServerPort">The port of the NameServer</param>
+ /// <param name="messageHandler">The observer to handle incoming messages</param>
+ /// <param name="idFactory">The factory used to create IIdentifiers</param>
+ /// <param name="codec">The codec used for serialization</param>
+ [Inject]
+ public NetworkService(
+ [Parameter(typeof(NetworkServiceOptions.NetworkServicePort))] int nsPort,
+ [Parameter(typeof(NamingConfigurationOptions.NameServerAddress))] string nameServerAddr,
+ [Parameter(typeof(NamingConfigurationOptions.NameServerPort))] int nameServerPort,
+ IObserver<NsMessage<T>> messageHandler,
+ IIdentifierFactory idFactory,
+ ICodec<T> codec)
+ {
+ _codec = new NsMessageCodec<T>(codec, idFactory);
+
+ IPAddress localAddress = NetworkUtils.LocalIPAddress;
+ _remoteManager = new DefaultRemoteManager<NsMessage<T>>(localAddress, nsPort, _codec);
+ _messageHandler = messageHandler;
+
+ NamingClient = new NameClient(nameServerAddr, nameServerPort);
+ _connectionMap = new Dictionary<IIdentifier, IConnection<T>>();
+
+ LOGGER.Log(Level.Info, "Started network service");
+ }
+
+ /// <summary>
+ /// Name client for registering ids
+ /// </summary>
+ public INameClient NamingClient { get; private set; }
+
+ /// <summary>
+ /// Open a new connection to the remote host registered to
+ /// the name service with the given identifier
+ /// </summary>
+ /// <param name="destinationId">The identifier of the remote host</param>
+ /// <returns>The IConnection used for communication</returns>
+ public IConnection<T> NewConnection(IIdentifier destinationId)
+ {
+ if (_localIdentifier == null)
+ {
+ throw new IllegalStateException("Cannot open connection without first registering an ID");
+ }
+
+ IConnection<T> connection;
+ if (_connectionMap.TryGetValue(destinationId, out connection))
+ {
+ return connection;
+ }
+
+ connection = new NsConnection<T>(_localIdentifier, destinationId,
+ NamingClient, _remoteManager, _connectionMap);
+
+ _connectionMap[destinationId] = connection;
+ return connection;
+ }
+
+ /// <summary>
+ /// Register the identifier for the NetworkService with the NameService.
+ /// </summary>
+ /// <param name="id">The identifier to register</param>
+ public void Register(IIdentifier id)
+ {
+ LOGGER.Log(Level.Info, "Registering id {0} with network service.", id);
+
+ _localIdentifier = id;
+ NamingClient.Register(id.ToString(), _remoteManager.LocalEndpoint);
+
+ // Create and register incoming message handler
+ var anyEndpoint = new IPEndPoint(IPAddress.Any, 0);
+ _messageHandlerDisposable = _remoteManager.RegisterObserver(anyEndpoint, _messageHandler);
+ }
+
+ /// <summary>
+ /// Unregister the identifier for the NetworkService with the NameService.
+ /// </summary>
+ public void Unregister()
+ {
+ if (_localIdentifier == null)
+ {
+ throw new IllegalStateException("Cannot unregister a non existant identifier");
+ }
+
+ NamingClient.Unregister(_localIdentifier.ToString());
+ _localIdentifier = null;
+ _messageHandlerDisposable.Dispose();
+ }
+
+ /// <summary>
+ /// Dispose of the NetworkService's resources
+ /// </summary>
+ public void Dispose()
+ {
+ NamingClient.Dispose();
+ _remoteManager.Dispose();
+
+ LOGGER.Log(Level.Info, "Disposed of network service");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Org.Apache.REEF.Network/NetworkService/NetworkServiceConfiguration.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/NetworkService/NetworkServiceConfiguration.cs b/lang/cs/Org.Apache.REEF.Network/NetworkService/NetworkServiceConfiguration.cs
new file mode 100644
index 0000000..003260c
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/NetworkService/NetworkServiceConfiguration.cs
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Diagnostics.CodeAnalysis;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Common.io;
+using Org.Apache.REEF.Network.Naming;
+using Org.Apache.REEF.Tang.Formats;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Wake.Remote;
+
+namespace Org.Apache.REEF.Network.NetworkService
+{
+ public class NetworkServiceConfiguration : ConfigurationModuleBuilder
+ {
+ [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")]
+ public static readonly RequiredParameter<int> NetworkServicePort = new RequiredParameter<int>();
+
+ [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")]
+ public static readonly RequiredImpl<ICodecFactory> NetworkServiceCodecFactory = new RequiredImpl<ICodecFactory>();
+
+ public static ConfigurationModule ConfigurationModule
+ {
+ get
+ {
+ return new NetworkServiceConfiguration()
+ .BindNamedParameter(GenericType<NetworkServiceOptions.NetworkServicePort>.Class, NetworkServicePort)
+ .BindNamedParameter(GenericType<NamingConfigurationOptions.NameServerPort>.Class,
+ NamingConfiguration.NameServerPort)
+ .BindNamedParameter(GenericType<NamingConfigurationOptions.NameServerAddress>.Class,
+ NamingConfiguration.NameServerAddress)
+ .BindImplementation(GenericType<ICodecFactory>.Class, NetworkServiceCodecFactory)
+ .Build();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Org.Apache.REEF.Network/NetworkService/NetworkServiceOptions.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/NetworkService/NetworkServiceOptions.cs b/lang/cs/Org.Apache.REEF.Network/NetworkService/NetworkServiceOptions.cs
new file mode 100644
index 0000000..008751d
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/NetworkService/NetworkServiceOptions.cs
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Wake;
+using Org.Apache.REEF.Wake.Remote;
+
+namespace Org.Apache.REEF.Network.NetworkService
+{
+ public class NetworkServiceOptions
+ {
+ [NamedParameter("Port of NetworkService", "NsPort", "0")]
+ public class NetworkServicePort : Name<int>
+ {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Org.Apache.REEF.Network/NetworkService/NsConnection.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/NetworkService/NsConnection.cs b/lang/cs/Org.Apache.REEF.Network/NetworkService/NsConnection.cs
new file mode 100644
index 0000000..5465faa
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/NetworkService/NsConnection.cs
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Globalization;
+using System.IO;
+using System.Net;
+using System.Net.Sockets;
+using System.Runtime.Remoting;
+using Org.Apache.REEF.Common.io;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Tang.Exceptions;
+using Org.Apache.REEF.Wake;
+using Org.Apache.REEF.Wake.Remote;
+
+namespace Org.Apache.REEF.Network.NetworkService
+{
+ /// <summary>
+ /// Represents a connection between two hosts using the NetworkService.
+ /// </summary>
+ public class NsConnection<T> : IConnection<T>
+ {
+ private static readonly Logger LOGGER = Logger.GetLogger(typeof(NsConnection<T>));
+
+ private IIdentifier _sourceId;
+ private IIdentifier _destId;
+ private INameClient _nameClient;
+ private IRemoteManager<NsMessage<T>> _remoteManager;
+ private Dictionary<IIdentifier, IConnection<T>> _connectionMap;
+ private IObserver<NsMessage<T>> _remoteSender;
+
+ /// <summary>
+ /// Creates a new NsConnection between two hosts.
+ /// </summary>
+ /// <param name="sourceId">The identifier of the sender</param>
+ /// <param name="destId">The identifier of the receiver</param>
+ /// <param name="nameClient">The NameClient used for naming lookup</param>
+ /// <param name="remoteManager">The remote manager used for network communication</param>
+ /// <param name="connectionMap">A cache of opened connections. Will remove itself from
+ /// the cache when the NsConnection is disposed.</param>
+ public NsConnection(
+ IIdentifier sourceId,
+ IIdentifier destId,
+ INameClient nameClient,
+ IRemoteManager<NsMessage<T>> remoteManager,
+ Dictionary<IIdentifier, IConnection<T>> connectionMap)
+ {
+ _sourceId = sourceId;
+ _destId = destId;
+ _nameClient = nameClient;
+ _remoteManager = remoteManager;
+ _connectionMap = connectionMap;
+ }
+
+ /// <summary>
+ /// Opens the connection to the remote host.
+ /// </summary>
+ public void Open()
+ {
+ string destStr = _destId.ToString();
+ LOGGER.Log(Level.Verbose, "Network service opening connection to {0}...", destStr);
+
+ IPEndPoint destAddr = _nameClient.Lookup(_destId.ToString());
+ if (destAddr == null)
+ {
+ throw new RemotingException("Cannot register Identifier with NameService");
+ }
+
+ try
+ {
+ _remoteSender = _remoteManager.GetRemoteObserver(destAddr);
+ LOGGER.Log(Level.Verbose, "Network service completed connection to {0}.", destStr);
+ }
+ catch (SocketException)
+ {
+ LOGGER.Log(Level.Error, "Network Service cannot open connection to " + destAddr);
+ throw;
+ }
+ catch (ObjectDisposedException)
+ {
+ LOGGER.Log(Level.Error, "Network Service cannot open connection to " + destAddr);
+ throw;
+ }
+ }
+
+ /// <summary>
+ /// Writes the object to the remote host.
+ /// </summary>
+ /// <param name="message">The message to send</param>
+ public void Write(T message)
+ {
+ if (_remoteSender == null)
+ {
+ throw new IllegalStateException("NsConnection has not been opened yet.");
+ }
+
+ try
+ {
+ _remoteSender.OnNext(new NsMessage<T>(_sourceId, _destId, message));
+ }
+ catch (IOException)
+ {
+ LOGGER.Log(Level.Error, "Network Service cannot write message to {0}", _destId);
+ throw;
+ }
+ catch (ObjectDisposedException)
+ {
+ LOGGER.Log(Level.Error, "Network Service cannot write message to {0}", _destId);
+ throw;
+ }
+ }
+
+ /// <summary>
+ /// Closes the connection
+ /// </summary>
+ public void Dispose()
+ {
+ _connectionMap.Remove(_destId);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Org.Apache.REEF.Network/NetworkService/NsMessage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/NetworkService/NsMessage.cs b/lang/cs/Org.Apache.REEF.Network/NetworkService/NsMessage.cs
new file mode 100644
index 0000000..0eba888
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/NetworkService/NsMessage.cs
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Collections.Generic;
+using Org.Apache.REEF.Wake;
+
+namespace Org.Apache.REEF.Network.NetworkService
+{
+ /// <summary>
+ /// Message sent between NetworkServices
+ /// </summary>
+ /// <typeparam name="T">The type of data being sent</typeparam>
+ public class NsMessage<T>
+ {
+ /// <summary>
+ /// Create a new NsMessage with no data.
+ /// </summary>
+ /// <param name="sourceId">The identifier of the sender</param>
+ /// <param name="destId">The identifier of the receiver</param>
+ public NsMessage(IIdentifier sourceId, IIdentifier destId)
+ {
+ SourceId = sourceId;
+ DestId = destId;
+ Data = new List<T>();
+ }
+
+ /// <summary>
+ /// Create a new NsMessage with data.
+ /// </summary>
+ /// <param name="sourceId">The identifier of the sender</param>
+ /// <param name="destId">The identifier of the receiver</param>
+ /// <param name="message">The message to send</param>
+ public NsMessage(IIdentifier sourceId, IIdentifier destId, T message)
+ {
+ SourceId = sourceId;
+ DestId = destId;
+ Data = new List<T> { message };
+ }
+
+ /// <summary>
+ /// The identifier of the sender of the message.
+ /// </summary>
+ public IIdentifier SourceId { get; private set; }
+
+ /// <summary>
+ /// The identifier of the receiver of the message.
+ /// </summary>
+ public IIdentifier DestId { get; private set; }
+
+ /// <summary>
+ /// A list of data being sent in the message.
+ /// </summary>
+ public List<T> Data { get; private set; }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj b/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
new file mode 100644
index 0000000..6824277
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
@@ -0,0 +1,184 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<Project ToolsVersion="12.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <Import Project="$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props" Condition="Exists('$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props')" />
+ <PropertyGroup>
+ <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
+ <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
+ <ProjectGuid>{883CE800-6A6A-4E0A-B7FE-C054F4F2C1DC}</ProjectGuid>
+ <OutputType>Library</OutputType>
+ <AppDesignerFolder>Properties</AppDesignerFolder>
+ <RootNamespace>Org.Apache.REEF.Network</RootNamespace>
+ <AssemblyName>Org.Apache.REEF.Network</AssemblyName>
+ <TargetFrameworkVersion>v4.5</TargetFrameworkVersion>
+ <FileAlignment>512</FileAlignment>
+ <RestorePackages>true</RestorePackages>
+ <SolutionDir Condition="$(SolutionDir) == '' Or $(SolutionDir) == '*Undefined*'">..\</SolutionDir>
+ </PropertyGroup>
+ <Import Project="$(SolutionDir)\Source\build.props" />
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
+ <PlatformTarget>AnyCPU</PlatformTarget>
+ <DebugSymbols>true</DebugSymbols>
+ <DebugType>full</DebugType>
+ <Optimize>false</Optimize>
+ <OutputPath>$(BinDir)\$(Platform)\$(Configuration)\$(RootNamespace)</OutputPath>
+ <DefineConstants>DEBUG;TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|x64' ">
+ <PlatformTarget>AnyCPU</PlatformTarget>
+ <DebugSymbols>true</DebugSymbols>
+ <DebugType>full</DebugType>
+ <Optimize>false</Optimize>
+ <OutputPath>$(BinDir)\$(Platform)\$(Configuration)\$(RootNamespace)</OutputPath>
+ <DefineConstants>DEBUG;TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
+ <PlatformTarget>AnyCPU</PlatformTarget>
+ <DebugType>pdbonly</DebugType>
+ <Optimize>true</Optimize>
+ <OutputPath>$(BinDir)\$(Platform)\$(Configuration)\$(RootNamespace)</OutputPath>
+ <DefineConstants>TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|x64' ">
+ <PlatformTarget>AnyCPU</PlatformTarget>
+ <DebugType>pdbonly</DebugType>
+ <Optimize>true</Optimize>
+ <OutputPath>$(BinDir)\$(Platform)\$(Configuration)\$(RootNamespace)</OutputPath>
+ <DefineConstants>TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <ItemGroup>
+ <Reference Include="Microsoft.Hadoop.Avro">
+ <HintPath>$(PackagesDir)\Microsoft.Hadoop.Avro.$(AvroVersion)\lib\net40\Microsoft.Hadoop.Avro.dll</HintPath>
+ </Reference>
+ <Reference Include="Newtonsoft.Json">
+ <HintPath>$(PackagesDir)\Newtonsoft.Json.$(NewtonsoftJsonVersion)\lib\net45\Newtonsoft.Json.dll</HintPath>
+ </Reference>
+ <Reference Include="protobuf-net">
+ <HintPath>$(PackagesDir)\protobuf-net.$(ProtobufVersion)\lib\net40\protobuf-net.dll</HintPath>
+ </Reference>
+ <Reference Include="System.Reactive.Core">
+ <HintPath>$(PackagesDir)\Rx-Core.$(RxVersion)\lib\net45\System.Reactive.Core.dll</HintPath>
+ </Reference>
+ <Reference Include="System.Reactive.Interfaces">
+ <HintPath>$(PackagesDir)\Rx-Interfaces.$(RxVersion)\lib\net45\System.Reactive.Interfaces.dll</HintPath>
+ </Reference>
+ <Reference Include="System" />
+ <Reference Include="System.Core" />
+ <Reference Include="System.Runtime.Serialization" />
+ <Reference Include="System.Xml.Linq" />
+ <Reference Include="System.Data.DataSetExtensions" />
+ <Reference Include="Microsoft.CSharp" />
+ <Reference Include="System.Data" />
+ <Reference Include="System.Xml" />
+ </ItemGroup>
+ <ItemGroup>
+ <Compile Include="Naming\Codec\NamingLookupRequestCodec.cs" />
+ <Compile Include="Naming\Codec\NamingLookupResponseCodec.cs" />
+ <Compile Include="Naming\Codec\NamingRegisterRequestCodec.cs" />
+ <Compile Include="Naming\Codec\NamingRegisterResponseCodec.cs" />
+ <Compile Include="Naming\Codec\NamingUnregisterRequestCodec.cs" />
+ <Compile Include="Naming\Contracts\AvroNamingAssignment.cs" />
+ <Compile Include="Naming\Contracts\AvroNamingLookupRequest.cs" />
+ <Compile Include="Naming\Contracts\AvroNamingLookupResponse.cs" />
+ <Compile Include="Naming\Contracts\AvroNamingRegisterRequest.cs" />
+ <Compile Include="Naming\Contracts\AvroNamingUnRegisterRequest.cs" />
+ <Compile Include="Naming\Events\NamingEvent.cs" />
+ <Compile Include="Naming\Events\NamingGetAllRequest.cs" />
+ <Compile Include="Naming\Events\NamingGetAllResponse.cs" />
+ <Compile Include="Naming\Events\NamingLookupRequest.cs" />
+ <Compile Include="Naming\Events\NamingLookupResponse.cs" />
+ <Compile Include="Naming\Events\NamingRegisterRequest.cs" />
+ <Compile Include="Naming\Events\NamingRegisterResponse.cs" />
+ <Compile Include="Naming\Events\NamingUnregisterRequest.cs" />
+ <Compile Include="Naming\Events\NamingUnregisterResponse.cs" />
+ <Compile Include="Naming\INameServer.cs" />
+ <Compile Include="Naming\NameClient.cs" />
+ <Compile Include="Naming\NameLookupClient.cs" />
+ <Compile Include="Naming\NameRegisterClient.cs" />
+ <Compile Include="Naming\NameServer.cs" />
+ <Compile Include="Naming\NamingConfiguration.cs">
+ <SubType>Code</SubType>
+ </Compile>
+ <Compile Include="Naming\NamingConfigurationOptions.cs">
+ <SubType>Code</SubType>
+ </Compile>
+ <Compile Include="Naming\Observers\NamingGetAllRequestObserver.cs" />
+ <Compile Include="Naming\Observers\NamingLookupRequestObserver.cs" />
+ <Compile Include="Naming\Observers\NamingRegisterRequestObserver.cs" />
+ <Compile Include="Naming\Observers\NamingUnregisterRequestObserver.cs" />
+ <Compile Include="NetworkService\Codec\ControlMessageCodec.cs" />
+ <Compile Include="NetworkService\Codec\NsMessageCodec.cs" />
+ <Compile Include="NetworkService\Codec\NsMessageProto.cs" />
+ <Compile Include="NetworkService\ControlMessage.cs" />
+ <Compile Include="NetworkService\IConnection.cs" />
+ <Compile Include="NetworkService\INetworkService.cs" />
+ <Compile Include="NetworkService\NetworkService.cs" />
+ <Compile Include="NetworkService\NetworkServiceConfiguration.cs" />
+ <Compile Include="NetworkService\NetworkServiceOptions.cs" />
+ <Compile Include="NetworkService\NsConnection.cs" />
+ <Compile Include="NetworkService\NsMessage.cs" />
+ <Compile Include="Properties\AssemblyInfo.cs" />
+ <Compile Include="Utilities\BlockingCollectionExtensions.cs" />
+ <Compile Include="Utilities\Utils.cs" />
+ </ItemGroup>
+ <ItemGroup>
+ <None Include="packages.config" />
+ </ItemGroup>
+ <ItemGroup>
+ <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Tang\Org.Apache.REEF.Tang.csproj">
+ <Project>{97dbb573-3994-417a-9f69-ffa25f00d2a6}</Project>
+ <Name>Org.Apache.REEF.Tang</Name>
+ </ProjectReference>
+ <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Utilities\Org.Apache.REEF.Utilities.csproj">
+ <Project>{79e7f89a-1dfb-45e1-8d43-d71a954aeb98}</Project>
+ <Name>Org.Apache.REEF.Utilities</Name>
+ </ProjectReference>
+ <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Common\Org.Apache.REEF.Common.csproj">
+ <Project>{545a0582-4105-44ce-b99c-b1379514a630}</Project>
+ <Name>Org.Apache.REEF.Common</Name>
+ </ProjectReference>
+ <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Driver\Org.Apache.REEF.Driver.csproj">
+ <Project>{a6baa2a7-f52f-4329-884e-1bcf711d6805}</Project>
+ <Name>Org.Apache.REEF.Driver</Name>
+ </ProjectReference>
+ <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Wake\Org.Apache.REEF.Wake.csproj">
+ <Project>{cdfb3464-4041-42b1-9271-83af24cd5008}</Project>
+ <Name>Org.Apache.REEF.Wake</Name>
+ </ProjectReference>
+ </ItemGroup>
+ <ItemGroup>
+ <WCFMetadata Include="Service References\" />
+ </ItemGroup>
+ <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
+ <Import Project="$(SolutionDir)\.nuget\NuGet.targets" Condition="Exists('$(SolutionDir)\.nuget\NuGet.targets')" />
+ <!-- To modify your build process, add your task inside one of the targets below and uncomment it.
+ Other similar extension points exist, see Microsoft.Common.targets.
+ <Target Name="BeforeBuild">
+ </Target>
+ <Target Name="AfterBuild">
+ </Target>
+ -->
+</Project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Org.Apache.REEF.Network/Properties/AssemblyInfo.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Properties/AssemblyInfo.cs b/lang/cs/Org.Apache.REEF.Network/Properties/AssemblyInfo.cs
new file mode 100644
index 0000000..536e986
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Properties/AssemblyInfo.cs
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Reflection;
+using System.Runtime.CompilerServices;
+using System.Runtime.InteropServices;
+
+// General Information about an assembly is controlled through the following
+// set of attributes. Change these attribute values to modify the information
+// associated with an assembly.
+[assembly: AssemblyTitle("Org.Apache.REEF.Network")]
+[assembly: AssemblyDescription("")]
+[assembly: AssemblyConfiguration("")]
+[assembly: AssemblyCompany("")]
+[assembly: AssemblyProduct("Org.Apache.REEF.Network")]
+[assembly: AssemblyCopyright("Copyright © 2015")]
+[assembly: AssemblyTrademark("")]
+[assembly: AssemblyCulture("")]
+
+// Setting ComVisible to false makes the types in this assembly not visible
+// to COM components. If you need to access a type in this assembly from
+// COM, set the ComVisible attribute to true on that type.
+[assembly: ComVisible(false)]
+
+// The following GUID is for the ID of the typelib if this project is exposed to COM
+[assembly: Guid("b3f5e608-8908-4f06-a87e-5e41c88133ac")]
+
+// Version information for an assembly consists of the following four values:
+//
+// Major Version
+// Minor Version
+// Build Number
+// Revision
+//
+// You can specify all the values or you can default the Build and Revision Numbers
+// by using the '*' as shown below:
+// [assembly: AssemblyVersion("1.0.*")]
+[assembly: AssemblyVersion("1.0.0.0")]
+[assembly: AssemblyFileVersion("1.0.0.0")]
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Org.Apache.REEF.Network/Utilities/BlockingCollectionExtensions.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Utilities/BlockingCollectionExtensions.cs b/lang/cs/Org.Apache.REEF.Network/Utilities/BlockingCollectionExtensions.cs
new file mode 100644
index 0000000..9dc057c
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Utilities/BlockingCollectionExtensions.cs
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+
+namespace Org.Apache.REEF.Network.Utilities
+{
+ public static class BlockingCollectionExtensions
+ {
+ /// <summary>
+ /// Removes the given item from the BlockingCollection if it is present.
+ /// If it is not present, it blocks until any item is available in the
+ /// BlockingCollection. It then removes and returns that first available
+ /// item.
+ /// </summary>
+ /// <typeparam name="T">The type of BlockingCollection</typeparam>
+ /// <param name="collection">The BlockingCollection to remove the specified item</param>
+ /// <param name="item">The item to remove from the BlockingCollection, if it exists</param>
+ /// <returns>The specified item, or the first available item if the specified item is
+ /// not present in the BlockingCollection</returns>
+ public static T Take<T>(this BlockingCollection<T> collection, T item)
+ {
+ T ret = default(T);
+ bool foundItem = false;
+ List<T> removedItems = new List<T>();
+
+ // Empty the collection
+ for (int i = 0; i < collection.Count; i++)
+ {
+ T removed;
+ if (collection.TryTake(out removed))
+ {
+ removedItems.Add(removed);
+ }
+ }
+
+ // Add them back to the collection minus the specified item
+ foreach (T removedItem in removedItems)
+ {
+ if (removedItem.Equals(item))
+ {
+ ret = removedItem;
+ foundItem = true;
+ }
+ else
+ {
+ collection.Add(removedItem);
+ }
+ }
+
+ if (!foundItem)
+ {
+ // Error: the element wasn't in the collection
+ throw new InvalidOperationException(item + " not found in blocking collection");
+ }
+
+ return ret;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Org.Apache.REEF.Network/Utilities/Utils.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Utilities/Utils.cs b/lang/cs/Org.Apache.REEF.Network/Utilities/Utils.cs
new file mode 100644
index 0000000..bc02b89
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Utilities/Utils.cs
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.IO;
+using Microsoft.Hadoop.Avro;
+using Org.Apache.REEF.Driver.Context;
+using Org.Apache.REEF.Tasks;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Tang.Exceptions;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+
+namespace Org.Apache.REEF.Network.Utilities
+{
+ internal class Utils
+ {
+ private static Logger LOGGER = Logger.GetLogger(typeof(Utils));
+
+ /// <summary>
+ /// Returns the TaskIdentifier from the Configuration.
+ /// </summary>
+ /// <param name="taskConfiguration">The Configuration object</param>
+ /// <returns>The TaskIdentifier for the given Configuration</returns>
+ public static string GetTaskId(IConfiguration taskConfiguration)
+ {
+ try
+ {
+ IInjector injector = TangFactory.GetTang().NewInjector(taskConfiguration);
+ return injector.GetNamedInstance<TaskConfigurationOptions.Identifier, string>(
+ GenericType<TaskConfigurationOptions.Identifier>.Class);
+ }
+ catch (InjectionException)
+ {
+ LOGGER.Log(Level.Error, "Unable to find task identifier");
+ throw;
+ }
+ }
+
+ /// <summary>
+ /// Returns the Context Identifier from the Configuration.
+ /// </summary>
+ /// <param name="contextConfiguration">The Configuration object</param>
+ /// <returns>The TaskIdentifier for the given Configuration</returns>
+ public static string GetContextId(IConfiguration contextConfiguration)
+ {
+ try
+ {
+ IInjector injector = TangFactory.GetTang().NewInjector(contextConfiguration);
+ return injector.GetNamedInstance<ContextConfigurationOptions.ContextIdentifier, string>(
+ GenericType<ContextConfigurationOptions.ContextIdentifier>.Class);
+ }
+ catch (InjectionException)
+ {
+ LOGGER.Log(Level.Error, "Unable to find task identifier");
+ throw;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b6c4e983/lang/cs/Org.Apache.REEF.Network/packages.config
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/packages.config b/lang/cs/Org.Apache.REEF.Network/packages.config
new file mode 100644
index 0000000..88cf17b
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/packages.config
@@ -0,0 +1,26 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<packages>
+ <package id="Microsoft.Hadoop.Avro" version="1.4.0.0" targetFramework="net45" />
+ <package id="Newtonsoft.Json" version="6.0.8" targetFramework="net45" />
+ <package id="protobuf-net" version="2.0.0.668" targetFramework="net45" />
+ <package id="Rx-Core" version="2.2.5" targetFramework="net45" />
+ <package id="Rx-Interfaces" version="2.2.5" targetFramework="net45" />
+</packages>
\ No newline at end of file