You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/05/08 03:13:24 UTC
[1/2] incubator-reef git commit: [REEF-273]: Port ranges need to be
configurable in REEF.NET
Repository: incubator-reef
Updated Branches:
refs/heads/master 7a07abcf6 -> fffee8545
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/fffee854/lang/cs/Org.Apache.REEF.Wake/Remote/Parameters/TcpPortRangeStart.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Parameters/TcpPortRangeStart.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Parameters/TcpPortRangeStart.cs
new file mode 100644
index 0000000..57a3b72
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Parameters/TcpPortRangeStart.cs
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.Wake.Remote.Parameters
+{
+ [NamedParameter(Documentation = "Port number range start for listening on tcp ports", DefaultValue = "8900")]
+ public class TcpPortRangeStart : Name<int>
+ {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/fffee854/lang/cs/Org.Apache.REEF.Wake/Remote/Parameters/TcpPortRangeTryCount.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Parameters/TcpPortRangeTryCount.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Parameters/TcpPortRangeTryCount.cs
new file mode 100644
index 0000000..b740f9b
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Parameters/TcpPortRangeTryCount.cs
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.Wake.Remote.Parameters
+{
+ [NamedParameter(Documentation = "Count of tries to get a tcp port in the port range", DefaultValue = "1000")]
+ public class TcpPortRangeTryCount : Name<int>
+ {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/fffee854/lang/cs/Org.Apache.REEF.Wake/Remote/TcpPortProvider.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/TcpPortProvider.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/TcpPortProvider.cs
new file mode 100644
index 0000000..5662d7f
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/TcpPortProvider.cs
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections;
+using System.Collections.Generic;
+using System.Diagnostics;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Wake.Remote.Parameters;
+
+namespace Org.Apache.REEF.Wake.Remote
+{
+ public class TcpPortProvider : ITcpPortProvider
+ {
+ private readonly int _tcpPortRangeStart;
+ private readonly int _tcpPortRangeCount;
+ private readonly int _tcpPortRangeTryCount;
+ private readonly int _tcpPortRangeSeed;
+
+ [Inject]
+ internal TcpPortProvider(
+ [Parameter(typeof(TcpPortRangeStart))] int tcpPortRangeStart,
+ [Parameter(typeof(TcpPortRangeCount))] int tcpPortRangeCount,
+ [Parameter(typeof(TcpPortRangeTryCount))] int tcpPortRangeTryCount,
+ [Parameter(typeof(TcpPortRangeSeed))] int tcpPortRangeSeed
+ )
+ {
+ _tcpPortRangeStart = tcpPortRangeStart;
+ _tcpPortRangeCount = tcpPortRangeCount;
+ _tcpPortRangeTryCount = tcpPortRangeTryCount;
+ _tcpPortRangeSeed = tcpPortRangeSeed;
+ }
+
+ IEnumerator<int> IEnumerable<int>.GetEnumerator()
+ {
+ return new TcpPortEnumerator(_tcpPortRangeStart, _tcpPortRangeCount, _tcpPortRangeTryCount,
+ _tcpPortRangeSeed);
+ }
+
+ IEnumerator IEnumerable.GetEnumerator()
+ {
+ return new TcpPortEnumerator(_tcpPortRangeStart, _tcpPortRangeCount, _tcpPortRangeTryCount,
+ _tcpPortRangeSeed);
+ }
+
+ private class TcpPortEnumerator : IEnumerator<int>
+ {
+ private readonly int _seed;
+ private Random _random;
+ private readonly int _tcpPortRangeStart;
+ private readonly int _tcpPortRangeCount;
+ private readonly int _tcpPortRangeTryCount;
+ private int _tcpPortRangeAttempt;
+
+ internal TcpPortEnumerator(
+ int tcpPortRangeStart,
+ int tcpPortRangeCount,
+ int tcpPortRangeTryCount,
+ int tcpPortRangeSeed)
+ {
+ _tcpPortRangeStart = tcpPortRangeStart;
+ _tcpPortRangeCount = tcpPortRangeCount;
+ _tcpPortRangeTryCount = tcpPortRangeTryCount;
+ _tcpPortRangeAttempt = 0;
+ _seed = tcpPortRangeSeed;
+ _random = new Random(_seed);
+ }
+
+ int IEnumerator<int>.Current
+ {
+ get { return _random.Next(_tcpPortRangeCount - 1) + 1 + _tcpPortRangeStart; }
+ }
+
+ void IDisposable.Dispose()
+ {
+ }
+
+ object IEnumerator.Current
+ {
+ get { return _random.Next(_tcpPortRangeCount - 1) + 1 + _tcpPortRangeStart; }
+ }
+
+ bool IEnumerator.MoveNext()
+ {
+ return _tcpPortRangeAttempt++ < _tcpPortRangeTryCount;
+ }
+
+ void IEnumerator.Reset()
+ {
+ _random = new Random(_seed);
+ }
+ }
+ }
+
+}
\ No newline at end of file
[2/2] incubator-reef git commit: [REEF-273]: Port ranges need to be
configurable in REEF.NET
Posted by we...@apache.org.
[REEF-273]: Port ranges need to be configurable in REEF.NET
This addresses the issue by adding support for port range configuration.
JIRA:
[REEF-273](https://issues.apache.org/jira/browse/REEF-273)
Pull Request:
This closes #175
Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/fffee854
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/fffee854
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/fffee854
Branch: refs/heads/master
Commit: fffee854502b5b462c8a284956a17f56c3c5f863
Parents: 7a07abc
Author: Beysim Sezgin <be...@microsoft.com>
Authored: Tue May 5 16:59:32 2015 -0700
Committer: Markus Weimer <we...@apache.org>
Committed: Thu May 7 18:10:32 2015 -0700
----------------------------------------------------------------------
.../Local/LocalRuntimeClientConfiguration.cs | 25 +++++
.../YARN/YARNClientConfiguration.cs | 26 +++++
.../Io/NamingConfigurationOptions.cs | 3 +-
.../Io/TcpPortConfigurationProvider.cs | 54 +++++++++
.../Org.Apache.REEF.Common.csproj | 1 +
.../Protobuf/ReefProtocol/REEFMessageCodec.cs | 1 +
.../Bridge/Events/AllocatedEvaluator.cs | 17 +--
lang/cs/Org.Apache.REEF.Evaluator/Evaluator.cs | 3 +-
.../HelloREEF.cs | 4 -
.../GroupCommunicationTests.cs | 18 ++-
.../NamingService/NameServerTests.cs | 22 ++--
.../NetworkService/NetworkServiceTests.cs | 5 +-
.../Group/Driver/Impl/GroupCommDriver.cs | 38 +------
.../Naming/NameServer.cs | 13 ++-
.../NetworkService/NetworkService.cs | 1 +
.../Util/ReflectionUtilities.cs | 41 +++----
.../RemoteManagerTest.cs | 41 ++++---
.../Org.Apache.REEF.Wake.Tests/TransportTest.cs | 45 +++++---
.../WritableRemoteManagerTest.cs | 2 +-
.../WritableTransportTest.cs | 39 ++++---
.../Org.Apache.REEF.Wake.csproj | 6 +
.../Remote/IRemoteManagerFactory.cs | 11 +-
.../Remote/ITcpPortProvider.cs | 32 ++++++
.../Remote/Impl/DefaultRemoteManager.cs | 17 ++-
.../Remote/Impl/DefaultRemoteManagerFactory.cs | 14 ++-
.../Remote/Impl/TransportServer.cs | 60 +++++++---
.../Remote/Impl/WritableRemoteManager.cs | 4 +-
.../Remote/Impl/WritableRemoteManagerFactory.cs | 6 +-
.../Remote/Impl/WritableTransportServer.cs | 55 ++++++++-
.../Remote/Parameters/TcpPortRangeCount.cs | 29 +++++
.../Remote/Parameters/TcpPortRangeSeed.cs | 28 +++++
.../Remote/Parameters/TcpPortRangeStart.cs | 28 +++++
.../Remote/Parameters/TcpPortRangeTryCount.cs | 28 +++++
.../Remote/TcpPortProvider.cs | 112 +++++++++++++++++++
34 files changed, 658 insertions(+), 171 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/fffee854/lang/cs/Org.Apache.REEF.Client/Local/LocalRuntimeClientConfiguration.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/Local/LocalRuntimeClientConfiguration.cs b/lang/cs/Org.Apache.REEF.Client/Local/LocalRuntimeClientConfiguration.cs
index e24d2ae..2504a5d 100644
--- a/lang/cs/Org.Apache.REEF.Client/Local/LocalRuntimeClientConfiguration.cs
+++ b/lang/cs/Org.Apache.REEF.Client/Local/LocalRuntimeClientConfiguration.cs
@@ -22,6 +22,7 @@ using Org.Apache.REEF.Client.Local.Parameters;
using Org.Apache.REEF.Tang.Formats;
using Org.Apache.REEF.Tang.Interface;
using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Wake.Remote.Parameters;
namespace Org.Apache.REEF.Client.Local
{
@@ -52,11 +53,35 @@ namespace Org.Apache.REEF.Client.Local
public static readonly OptionalImpl<IConfigurationProvider> DriverConfigurationProvider =
new OptionalImpl<IConfigurationProvider>();
+ /// <summary>
+ /// Start of the tcp port range for listening.
+ /// </summary>
+ public static readonly OptionalParameter<int> TcpPortRangeStartParameter = new OptionalParameter<int>();
+
+ /// <summary>
+ /// Number of port for the tcp port range for listening.
+ /// </summary>
+ public static readonly OptionalParameter<int> TcpPortRangeCountParameter = new OptionalParameter<int>();
+
+ /// <summary>
+ /// Max number of times we will deliver a port from the tcp port range.
+ /// </summary>
+ public static readonly OptionalParameter<int> TcpPortRangeTryCountParameter = new OptionalParameter<int>();
+
+ /// <summary>
+ /// Seed for the number number for determining which particular port to deliver from the range
+ /// </summary>
+ public static readonly OptionalParameter<int> TcpPortRangeSeedParameter = new OptionalParameter<int>();
+
public static ConfigurationModule ConfigurationModule = new LocalRuntimeClientConfiguration()
.BindImplementation(GenericType<IREEFClient>.Class, GenericType<LocalClient>.Class)
.BindNamedParameter(GenericType<LocalRuntimeDirectory>.Class, RuntimeFolder)
.BindNamedParameter(GenericType<NumberOfEvaluators>.Class, NumberOfEvaluators)
.BindSetEntry(GenericType<DriverConfigurationProviders>.Class, DriverConfigurationProvider)
+ .BindNamedParameter(GenericType<TcpPortRangeStart>.Class, TcpPortRangeStartParameter)
+ .BindNamedParameter(GenericType<TcpPortRangeCount>.Class, TcpPortRangeCountParameter)
+ .BindNamedParameter(GenericType<TcpPortRangeTryCount>.Class, TcpPortRangeTryCountParameter)
+ .BindNamedParameter(GenericType<TcpPortRangeSeed>.Class, TcpPortRangeSeedParameter)
.Build();
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/fffee854/lang/cs/Org.Apache.REEF.Client/YARN/YARNClientConfiguration.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/YARNClientConfiguration.cs b/lang/cs/Org.Apache.REEF.Client/YARN/YARNClientConfiguration.cs
index 94105a0..69c4b29 100644
--- a/lang/cs/Org.Apache.REEF.Client/YARN/YARNClientConfiguration.cs
+++ b/lang/cs/Org.Apache.REEF.Client/YARN/YARNClientConfiguration.cs
@@ -23,6 +23,7 @@ using Org.Apache.REEF.Tang.Formats;
using Org.Apache.REEF.Tang.Interface;
using Org.Apache.REEF.Tang.Util;
using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Wake.Remote.Parameters;
namespace Org.Apache.REEF.Client.YARN
{
@@ -36,9 +37,34 @@ namespace Org.Apache.REEF.Client.YARN
/// </summary>
public static readonly OptionalImpl<IConfigurationProvider> DriverConfigurationProvider = new OptionalImpl<IConfigurationProvider>();
+ /// <summary>
+ /// Start of the tcp port range for listening.
+ /// </summary>
+ public static readonly OptionalParameter<int> TcpPortRangeStartParameter = new OptionalParameter<int>();
+
+ /// <summary>
+ /// Number of port for the tcp port range for listening.
+ /// </summary>
+ public static readonly OptionalParameter<int> TcpPortRangeCountParameter = new OptionalParameter<int>();
+
+ /// <summary>
+ /// Max number of times we will deliver a port from the tcp port range.
+ /// </summary>
+ public static readonly OptionalParameter<int> TcpPortRangeTryCountParameter = new OptionalParameter<int>();
+
+ /// <summary>
+ /// Seed for the number number for determining which particular port to deliver from the range
+ /// </summary>
+ public static readonly OptionalParameter<int> TcpPortRangeSeedParameter = new OptionalParameter<int>();
+
+
public static ConfigurationModule ConfigurationModule = new YARNClientConfiguration()
.BindImplementation(GenericType<IREEFClient>.Class, GenericType<YARNClient>.Class)
.BindSetEntry(GenericType<DriverConfigurationProviders>.Class, DriverConfigurationProvider)
+ .BindNamedParameter(GenericType<TcpPortRangeStart>.Class, TcpPortRangeStartParameter)
+ .BindNamedParameter(GenericType<TcpPortRangeCount>.Class, TcpPortRangeCountParameter)
+ .BindNamedParameter(GenericType<TcpPortRangeTryCount>.Class, TcpPortRangeTryCountParameter)
+ .BindNamedParameter(GenericType<TcpPortRangeSeed>.Class, TcpPortRangeSeedParameter)
.Build();
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/fffee854/lang/cs/Org.Apache.REEF.Common/Io/NamingConfigurationOptions.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Io/NamingConfigurationOptions.cs b/lang/cs/Org.Apache.REEF.Common/Io/NamingConfigurationOptions.cs
index 8e7e91d..6f396eb 100644
--- a/lang/cs/Org.Apache.REEF.Common/Io/NamingConfigurationOptions.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Io/NamingConfigurationOptions.cs
@@ -17,6 +17,7 @@
* under the License.
*/
+using System.ComponentModel;
using Org.Apache.REEF.Tang.Annotations;
namespace Org.Apache.REEF.Common.Io
@@ -28,7 +29,7 @@ namespace Org.Apache.REEF.Common.Io
{
}
- [NamedParameter("Port of NameServer")]
+ [NamedParameter("Port of NameServer", DefaultValue = "0")]
public class NameServerPort : Name<int>
{
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/fffee854/lang/cs/Org.Apache.REEF.Common/Io/TcpPortConfigurationProvider.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Io/TcpPortConfigurationProvider.cs b/lang/cs/Org.Apache.REEF.Common/Io/TcpPortConfigurationProvider.cs
new file mode 100644
index 0000000..16bc014
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Io/TcpPortConfigurationProvider.cs
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using Org.Apache.REEF.Common.Evaluator.Parameters;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Wake.Remote.Parameters;
+
+namespace Org.Apache.REEF.Common.Io
+{
+ public class TcpPortConfigurationProvider : IConfigurationProvider
+ {
+ private readonly IConfiguration _configuration;
+ [Inject]
+ private TcpPortConfigurationProvider(
+ [Parameter(typeof(TcpPortRangeStart))] int tcpPortRangeStart,
+ [Parameter(typeof(TcpPortRangeCount))] int tcpPortRangeCount,
+ [Parameter(typeof(TcpPortRangeTryCount))] int tcpPortRangeTryCount,
+ [Parameter(typeof(TcpPortRangeSeed))] int tcpPortRangeTrySeed)
+ {
+ _configuration = TangFactory.GetTang().NewConfigurationBuilder()
+ .BindIntNamedParam<TcpPortRangeStart>(tcpPortRangeStart.ToString())
+ .BindIntNamedParam<TcpPortRangeCount>(tcpPortRangeCount.ToString())
+ .BindIntNamedParam<TcpPortRangeTryCount>(tcpPortRangeTryCount.ToString())
+ .BindIntNamedParam<TcpPortRangeSeed>(tcpPortRangeTrySeed.ToString())
+ .BindSetEntry<EvaluatorConfigurationProviders, TcpPortConfigurationProvider, IConfigurationProvider>()
+ .Build();
+ }
+
+ IConfiguration IConfigurationProvider.GetConfiguration()
+ {
+ return _configuration;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/fffee854/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj b/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj
index 77da0f9..7bd929a 100644
--- a/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj
+++ b/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj
@@ -100,6 +100,7 @@ under the License.
<Compile Include="Io\NameAssignment.cs" />
<Compile Include="Io\NamingConfiguration.cs" />
<Compile Include="Io\NamingConfigurationOptions.cs" />
+ <Compile Include="Io\TcpPortConfigurationProvider.cs" />
<Compile Include="ITaskSubmittable.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="Protobuf\ReefProtocol\ClientRuntime.pb.cs" />
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/fffee854/lang/cs/Org.Apache.REEF.Common/Protobuf/ReefProtocol/REEFMessageCodec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Protobuf/ReefProtocol/REEFMessageCodec.cs b/lang/cs/Org.Apache.REEF.Common/Protobuf/ReefProtocol/REEFMessageCodec.cs
index e8c7515..697ea1c 100644
--- a/lang/cs/Org.Apache.REEF.Common/Protobuf/ReefProtocol/REEFMessageCodec.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Protobuf/ReefProtocol/REEFMessageCodec.cs
@@ -17,6 +17,7 @@
* under the License.
*/
+using Org.Apache.REEF.Tang.Annotations;
using Org.Apache.REEF.Wake.Remote;
namespace Org.Apache.REEF.Common.Protobuf.ReefProtocol
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/fffee854/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/AllocatedEvaluator.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/AllocatedEvaluator.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/AllocatedEvaluator.cs
index 05f22ba..3655593 100644
--- a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/AllocatedEvaluator.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/AllocatedEvaluator.cs
@@ -81,7 +81,8 @@ namespace Org.Apache.REEF.Driver.Bridge.Events
{
LOGGER.Log(Level.Info, "AllocatedEvaluator::SubmitContextAndTask");
- contextConfiguration = MergeContextConfiguration(contextConfiguration);
+ //TODO: Change this to service configuration when REEF-289(https://issues.apache.org/jira/browse/REEF-289) is fixed.
+ taskConfiguration = MergeWithConfigurationProviders(taskConfiguration);
string context = _serializer.ToString(contextConfiguration);
string task = _serializer.ToString(taskConfiguration);
@@ -94,8 +95,7 @@ namespace Org.Apache.REEF.Driver.Bridge.Events
public void SubmitContextAndService(IConfiguration contextConfiguration, IConfiguration serviceConfiguration)
{
LOGGER.Log(Level.Info, "AllocatedEvaluator::SubmitContextAndService");
-
- contextConfiguration = MergeContextConfiguration(contextConfiguration);
+
string context = _serializer.ToString(contextConfiguration);
string service = _serializer.ToString(serviceConfiguration);
@@ -109,7 +109,8 @@ namespace Org.Apache.REEF.Driver.Bridge.Events
{
LOGGER.Log(Level.Info, "AllocatedEvaluator::SubmitContextAndServiceAndTask");
- contextConfiguration = MergeContextConfiguration(contextConfiguration);
+ //TODO: Change this to service configuration when REEF-289(https://issues.apache.org/jira/browse/REEF-289) is fixed.
+ taskConfiguration = MergeWithConfigurationProviders(taskConfiguration);
string context = _serializer.ToString(contextConfiguration);
string service = _serializer.ToString(serviceConfiguration);
string task = _serializer.ToString(taskConfiguration);
@@ -179,17 +180,17 @@ namespace Org.Apache.REEF.Driver.Bridge.Events
}
}
- private IConfiguration MergeContextConfiguration(IConfiguration contextConfiguration)
+ private IConfiguration MergeWithConfigurationProviders(IConfiguration configuration)
{
- IConfiguration contextConfig = contextConfiguration;
+ IConfiguration config = configuration;
if (_configurationProviders != null)
{
foreach (var configurationProvider in _configurationProviders)
{
- contextConfig = Configurations.Merge(contextConfig, configurationProvider.GetConfiguration());
+ config = Configurations.Merge(config, configurationProvider.GetConfiguration());
}
}
- return contextConfig;
+ return config;
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/fffee854/lang/cs/Org.Apache.REEF.Evaluator/Evaluator.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Evaluator/Evaluator.cs b/lang/cs/Org.Apache.REEF.Evaluator/Evaluator.cs
index 5e094f6..f97438d 100644
--- a/lang/cs/Org.Apache.REEF.Evaluator/Evaluator.cs
+++ b/lang/cs/Org.Apache.REEF.Evaluator/Evaluator.cs
@@ -45,6 +45,7 @@ using Org.Apache.REEF.Wake.Remote;
using Org.Apache.REEF.Wake.Remote.Impl;
using Org.Apache.REEF.Wake.Time.Runtime;
using Org.Apache.REEF.Wake.Time.Runtime.Event;
+using Org.Apache.REEF.Wake.Util;
namespace Org.Apache.REEF.Evaluator
{
@@ -109,7 +110,7 @@ namespace Org.Apache.REEF.Evaluator
Optional<ServiceConfiguration> rootServiceConfig = _evaluatorConfig.RootServiceConfiguration;
// remoteManager used as client-only in evaluator
- IRemoteManager<REEFMessage> remoteManager = _injector.GetInstance<IRemoteManagerFactory>().GetInstance((new REEFMessageCodec()));
+ IRemoteManager<REEFMessage> remoteManager = _injector.GetInstance<IRemoteManagerFactory>().GetInstance(new REEFMessageCodec());
IRemoteIdentifier remoteId = new SocketRemoteIdentifier(NetUtilities.ParseIpEndpoint(rId));
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/fffee854/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEF.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEF.cs b/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEF.cs
index 1e74c9f..87a48e3 100644
--- a/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEF.cs
+++ b/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEF.cs
@@ -18,9 +18,6 @@
*/
using System;
-using System.Linq;
-using System.Net;
-using System.Net.Sockets;
using Org.Apache.REEF.Client.API;
using Org.Apache.REEF.Client.Local;
using Org.Apache.REEF.Client.YARN;
@@ -30,7 +27,6 @@ using Org.Apache.REEF.Tang.Annotations;
using Org.Apache.REEF.Tang.Implementations.Tang;
using Org.Apache.REEF.Tang.Interface;
using Org.Apache.REEF.Tang.Util;
-using Org.Apache.REEF.Wake.Remote;
namespace Org.Apache.REEF.Examples.HelloREEF
{
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/fffee854/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs
index 26fa16c..7a6b5c1 100644
--- a/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs
+++ b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs
@@ -25,6 +25,7 @@ using System.Net;
using System.Reactive;
using System.Text;
using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.REEF.Common.Io;
using Org.Apache.REEF.Common.Tasks;
using Org.Apache.REEF.Examples.MachineLearning.KMeans;
using Org.Apache.REEF.Examples.MachineLearning.KMeans.codecs;
@@ -40,6 +41,7 @@ using Org.Apache.REEF.Network.Group.Topology;
using Org.Apache.REEF.Network.Naming;
using Org.Apache.REEF.Network.NetworkService;
using Org.Apache.REEF.Network.NetworkService.Codec;
+using Org.Apache.REEF.Network.Tests.NamingService;
using Org.Apache.REEF.Tang.Annotations;
using Org.Apache.REEF.Tang.Formats;
using Org.Apache.REEF.Tang.Implementations.Configuration;
@@ -58,7 +60,7 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
[TestMethod]
public void TestSender()
{
- using (NameServer nameServer = new NameServer(0))
+ using (var nameServer = NameServerTests.BuildNameServer())
{
IPEndPoint endpoint = nameServer.LocalEndpoint;
BlockingCollection<GroupCommunicationMessage> messages1 = new BlockingCollection<GroupCommunicationMessage>();
@@ -261,8 +263,6 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
[TestMethod]
public void TestBroadcastOperator()
{
- NameServer nameServer = new NameServer(0);
-
string groupName = "group1";
string operatorName = "broadcast";
string masterTaskId = "task0";
@@ -295,7 +295,7 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
[TestMethod]
public void TestBroadcastOperatorWithDefaultCodec()
{
- NameServer nameServer = new NameServer(0);
+ INameServer nameServer = NameServerTests.BuildNameServer();
string groupName = "group1";
string operatorName = "broadcast";
@@ -745,9 +745,15 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
public static NetworkService<GroupCommunicationMessage> BuildNetworkService(
IPEndPoint nameServerEndpoint, IObserver<NsMessage<GroupCommunicationMessage>> handler)
{
- var remoteManagerFactory = TangFactory.GetTang().NewInjector().GetInstance<IRemoteManagerFactory>();
+ var injector = TangFactory.GetTang().NewInjector();
+ var remoteManagerFactory = injector.GetInstance<IRemoteManagerFactory>();
return new NetworkService<GroupCommunicationMessage>(
- 0, handler, new StringIdentifierFactory(), new GroupCommunicationMessageCodec(), new NameClient(nameServerEndpoint.Address.ToString(), nameServerEndpoint.Port), remoteManagerFactory);
+ 0, handler, new StringIdentifierFactory(),
+ new GroupCommunicationMessageCodec(),
+ new NameClient(nameServerEndpoint.Address.ToString(),
+ nameServerEndpoint.Port),
+ remoteManagerFactory);
+
}
private GroupCommunicationMessage CreateGcm(string message, string from, string to)
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/fffee854/lang/cs/Org.Apache.REEF.Network.Tests/NamingService/NameServerTests.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network.Tests/NamingService/NameServerTests.cs b/lang/cs/Org.Apache.REEF.Network.Tests/NamingService/NameServerTests.cs
index fd3002c..6523e84 100644
--- a/lang/cs/Org.Apache.REEF.Network.Tests/NamingService/NameServerTests.cs
+++ b/lang/cs/Org.Apache.REEF.Network.Tests/NamingService/NameServerTests.cs
@@ -28,6 +28,7 @@ using Org.Apache.REEF.Tang.Annotations;
using Org.Apache.REEF.Tang.Implementations.Tang;
using Org.Apache.REEF.Tang.Interface;
using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Wake.Remote.Parameters;
namespace Org.Apache.REEF.Network.Tests.NamingService
{
@@ -37,7 +38,7 @@ namespace Org.Apache.REEF.Network.Tests.NamingService
[TestMethod]
public void TestNameServerNoRequests()
{
- using (var server = new NameServer(0))
+ using (var server = BuildNameServer())
{
}
}
@@ -45,7 +46,7 @@ namespace Org.Apache.REEF.Network.Tests.NamingService
[TestMethod]
public void TestNameServerNoRequestsTwoClients()
{
- using (var server = new NameServer(0))
+ using (var server = BuildNameServer())
{
var nameClient = new NameClient(server.LocalEndpoint);
var nameClient2 = new NameClient(server.LocalEndpoint);
@@ -57,7 +58,7 @@ namespace Org.Apache.REEF.Network.Tests.NamingService
[TestMethod]
public void TestNameServerNoRequestsTwoClients2()
{
- using (var server = new NameServer(0))
+ using (var server = BuildNameServer())
{
var nameClient = new NameClient(server.LocalEndpoint);
var nameClient2 = new NameClient(server.LocalEndpoint);
@@ -69,7 +70,7 @@ namespace Org.Apache.REEF.Network.Tests.NamingService
[TestMethod]
public void TestNameServerMultipleRequestsTwoClients()
{
- using (var server = new NameServer(0))
+ using (var server = BuildNameServer())
{
var nameClient = new NameClient(server.LocalEndpoint);
var nameClient2 = new NameClient(server.LocalEndpoint);
@@ -192,7 +193,7 @@ namespace Org.Apache.REEF.Network.Tests.NamingService
{
int oldPort = 6666;
int newPort = 6662;
- INameServer server = new NameServer(oldPort);
+ INameServer server = BuildNameServer(oldPort);
using (INameClient client = BuildNameClient(server.LocalEndpoint))
{
@@ -203,7 +204,7 @@ namespace Org.Apache.REEF.Network.Tests.NamingService
server.Dispose();
- server = new NameServer(newPort);
+ server = BuildNameServer(newPort);
client.Restart(server.LocalEndpoint);
client.Register("b", endpoint);
@@ -217,7 +218,7 @@ namespace Org.Apache.REEF.Network.Tests.NamingService
public void TestConstructorInjection()
{
int port = 6666;
- using (INameServer server = new NameServer(port))
+ using (INameServer server = BuildNameServer(port))
{
IConfiguration nameClientConfiguration = NamingConfiguration.ConfigurationModule
.Set(NamingConfiguration.NameServerAddress, server.LocalEndpoint.Address.ToString())
@@ -232,12 +233,11 @@ namespace Org.Apache.REEF.Network.Tests.NamingService
}
}
- private INameServer BuildNameServer()
+ public static INameServer BuildNameServer(int listenPort = 0)
{
var builder = TangFactory.GetTang()
- .NewConfigurationBuilder()
- .BindNamedParameter<NamingConfigurationOptions.NameServerPort, int>(
- GenericType<NamingConfigurationOptions.NameServerPort>.Class, "0");
+ .NewConfigurationBuilder()
+ .BindIntNamedParam<NamingConfigurationOptions.NameServerPort>(listenPort.ToString());
return TangFactory.GetTang().NewInjector(builder.Build()).GetInstance<INameServer>();
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/fffee854/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/NetworkServiceTests.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/NetworkServiceTests.cs b/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/NetworkServiceTests.cs
index e52a082..762cc48 100644
--- a/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/NetworkServiceTests.cs
+++ b/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/NetworkServiceTests.cs
@@ -26,6 +26,7 @@ using Microsoft.VisualStudio.TestTools.UnitTesting;
using Org.Apache.REEF.Common.Io;
using Org.Apache.REEF.Network.Naming;
using Org.Apache.REEF.Network.NetworkService;
+using Org.Apache.REEF.Network.Tests.NamingService;
using Org.Apache.REEF.Tang.Annotations;
using Org.Apache.REEF.Tang.Implementations.Tang;
using Org.Apache.REEF.Tang.Util;
@@ -47,7 +48,7 @@ namespace Org.Apache.REEF.Network.Tests.NetworkService
BlockingCollection<string> queue = new BlockingCollection<string>();
- using (INameServer nameServer = new NameServer(0))
+ using (var nameServer = NameServerTests.BuildNameServer())
{
IPEndPoint endpoint = nameServer.LocalEndpoint;
int nameServerPort = endpoint.Port;
@@ -84,7 +85,7 @@ namespace Org.Apache.REEF.Network.Tests.NetworkService
BlockingCollection<string> queue1 = new BlockingCollection<string>();
BlockingCollection<string> queue2 = new BlockingCollection<string>();
- using (INameServer nameServer = new NameServer(0))
+ using (var nameServer = NameServerTests.BuildNameServer())
{
IPEndPoint endpoint = nameServer.LocalEndpoint;
int nameServerPort = endpoint.Port;
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/fffee854/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommDriver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommDriver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommDriver.cs
index 42394ab..a1a2548 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommDriver.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommDriver.cs
@@ -60,37 +60,9 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
private readonly Dictionary<string, ICommunicationGroupDriver> _commGroups;
private readonly AvroConfigurationSerializer _configSerializer;
- private readonly NameServer _nameServer;
-
- /// <summary>
- /// Create a new GroupCommunicationDriver object.
- /// </summary>
- /// <param name="driverId">Identifer for the REEF driver</param>
- /// <param name="masterTaskId">Identifer for Group Communication master task</param>
- /// <param name="fanOut">fanOut for tree topology</param>
- /// <param name="configSerializer">Used to serialize task configuration</param>
- [System.Obsolete("user the other constructor")]
- [Inject]
- public GroupCommDriver(
- [Parameter(typeof(GroupCommConfigurationOptions.DriverId))] string driverId,
- [Parameter(typeof(GroupCommConfigurationOptions.MasterTaskId))] string masterTaskId,
- [Parameter(typeof(GroupCommConfigurationOptions.FanOut))] int fanOut,
- AvroConfigurationSerializer configSerializer)
- {
- _driverId = driverId;
- _contextIds = -1;
- _fanOut = fanOut;
- MasterTaskId = masterTaskId;
-
- _configSerializer = configSerializer;
- _commGroups = new Dictionary<string, ICommunicationGroupDriver>();
- _nameServer = new NameServer(0);
-
- IPEndPoint localEndpoint = _nameServer.LocalEndpoint;
- _nameServerAddr = localEndpoint.Address.ToString();
- _nameServerPort = localEndpoint.Port;
- }
+ private readonly INameServer _nameServer;
+
/// <summary>
/// Create a new GroupCommunicationDriver object.
/// </summary>
@@ -100,6 +72,7 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
/// <param name="groupName">default communication group name</param>
/// <param name="numberOfTasks">Number of tasks in the default group</param>
/// <param name="configSerializer">Used to serialize task configuration</param>
+ /// <param name="nameServer">Used to map names to ip adresses</param>
[Inject]
public GroupCommDriver(
[Parameter(typeof(GroupCommConfigurationOptions.DriverId))] string driverId,
@@ -107,7 +80,8 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
[Parameter(typeof(GroupCommConfigurationOptions.FanOut))] int fanOut,
[Parameter(typeof(GroupCommConfigurationOptions.GroupName))] string groupName,
[Parameter(typeof(GroupCommConfigurationOptions.NumberOfTasks))] int numberOfTasks,
- AvroConfigurationSerializer configSerializer)
+ AvroConfigurationSerializer configSerializer,
+ INameServer nameServer)
{
_driverId = driverId;
_contextIds = -1;
@@ -117,7 +91,7 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
_configSerializer = configSerializer;
_commGroups = new Dictionary<string, ICommunicationGroupDriver>();
- _nameServer = new NameServer(0);
+ _nameServer = nameServer;
IPEndPoint localEndpoint = _nameServer.LocalEndpoint;
_nameServerAddr = localEndpoint.Address.ToString();
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/fffee854/lang/cs/Org.Apache.REEF.Network/Naming/NameServer.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Naming/NameServer.cs b/lang/cs/Org.Apache.REEF.Network/Naming/NameServer.cs
index 0175e1b..47d17f5 100644
--- a/lang/cs/Org.Apache.REEF.Network/Naming/NameServer.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Naming/NameServer.cs
@@ -32,6 +32,7 @@ using Org.Apache.REEF.Wake.Remote;
using Org.Apache.REEF.Wake.Remote.Impl;
using Org.Apache.REEF.Wake.RX;
using Org.Apache.REEF.Wake.RX.Impl;
+using Org.Apache.REEF.Wake.Util;
namespace Org.Apache.REEF.Network.Naming
{
@@ -50,8 +51,14 @@ namespace Org.Apache.REEF.Network.Naming
/// Create a new NameServer to run on the specified port.
/// </summary>
/// <param name="port">The port to listen for incoming connections on.</param>
+ /// <param name="tcpPortProvider">If port is 0, this interface provides
+ /// a port range to try.
+ /// </param>
+ [Obsolete("Please use TANG injection instead.")]
[Inject]
- public NameServer([Parameter(typeof(NamingConfigurationOptions.NameServerPort))] int port)
+ public NameServer(
+ [Parameter(typeof(NamingConfigurationOptions.NameServerPort))] int port,
+ ITcpPortProvider tcpPortProvider)
{
IObserver<TransportEvent<NamingEvent>> handler = CreateServerHandler();
_idToAddrMap = new Dictionary<string, IPEndPoint>();
@@ -59,7 +66,9 @@ namespace Org.Apache.REEF.Network.Naming
// Start transport server, get listening IP endpoint
_logger.Log(Level.Info, "Starting naming server");
- _server = new TransportServer<NamingEvent>(port, handler, codec);
+ _server = new TransportServer<NamingEvent>(
+ new IPEndPoint(NetworkUtils.LocalIPAddress, port), handler,
+ codec, tcpPortProvider);
_server.Run();
LocalEndpoint = _server.LocalEndpoint;
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/fffee854/lang/cs/Org.Apache.REEF.Network/NetworkService/NetworkService.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/NetworkService/NetworkService.cs b/lang/cs/Org.Apache.REEF.Network/NetworkService/NetworkService.cs
index 43e55c1..1b796a3 100644
--- a/lang/cs/Org.Apache.REEF.Network/NetworkService/NetworkService.cs
+++ b/lang/cs/Org.Apache.REEF.Network/NetworkService/NetworkService.cs
@@ -58,6 +58,7 @@ namespace Org.Apache.REEF.Network.NetworkService
/// <param name="idFactory">The factory used to create IIdentifiers</param>
/// <param name="codec">The codec used for serialization</param>
/// <param name="remoteManagerFactory">Used to instantiate remote manager instances.</param>
+ /// <param name="tcpPortProvider">Provides ports for tcp listeners.</param>
[Inject]
public NetworkService(
[Parameter(typeof(NetworkServiceOptions.NetworkServicePort))] int nsPort,
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/fffee854/lang/cs/Org.Apache.REEF.Tang/Util/ReflectionUtilities.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tang/Util/ReflectionUtilities.cs b/lang/cs/Org.Apache.REEF.Tang/Util/ReflectionUtilities.cs
index 29bd754..569a564 100644
--- a/lang/cs/Org.Apache.REEF.Tang/Util/ReflectionUtilities.cs
+++ b/lang/cs/Org.Apache.REEF.Tang/Util/ReflectionUtilities.cs
@@ -170,7 +170,7 @@ namespace Org.Apache.REEF.Tang.Util
if (c.IsInterface)
{
- workQueue.Add(typeof (object));
+ workQueue.Add(typeof(object));
}
return workQueue;
@@ -209,39 +209,39 @@ namespace Org.Apache.REEF.Tang.Util
/// <exception cref="System.NotSupportedException">Encountered unknown primitive type!</exception>
public static Type BoxClass(Type c)
{
- if (c.IsPrimitive && c != typeof (Type))
+ if (c.IsPrimitive && c != typeof(Type))
{
- if (c == typeof (bool))
+ if (c == typeof(bool))
{
- return typeof (Boolean);
+ return typeof(Boolean);
}
- else if (c == typeof (byte))
+ else if (c == typeof(byte))
{
- return typeof (Byte);
+ return typeof(Byte);
}
- else if (c == typeof (char))
+ else if (c == typeof(char))
{
- return typeof (Char);
+ return typeof(Char);
}
- else if (c == typeof (short))
+ else if (c == typeof(short))
{
- return typeof (Int16);
+ return typeof(Int16);
}
- else if (c == typeof (int))
+ else if (c == typeof(int))
{
- return typeof (Int32);
+ return typeof(Int32);
}
- else if (c == typeof (long))
+ else if (c == typeof(long))
{
- return typeof (Int64);
+ return typeof(Int64);
}
- else if (c == typeof (float))
+ else if (c == typeof(float))
{
- return typeof (Single);
+ return typeof(Single);
}
- else if (c == typeof (double))
+ else if (c == typeof(double))
{
- return typeof (Double);
+ return typeof(Double);
}
else
{
@@ -371,7 +371,8 @@ namespace Org.Apache.REEF.Tang.Util
}
if (t == null)
{
- Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(new ApplicationException("Not able to get Type from the name provided: " + name), LOGGER);
+ Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(
+ new ApplicationException("Not able to get Type from the name provided: " + name), LOGGER);
}
return t;
@@ -480,7 +481,7 @@ namespace Org.Apache.REEF.Tang.Util
Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(ex, LOGGER);
}
- return args[0];
+ return args[0];
}
if (ImplementName(type)) //Implement Name<> but no [NamedParameter] attribute
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/fffee854/lang/cs/Org.Apache.REEF.Wake.Tests/RemoteManagerTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/RemoteManagerTest.cs b/lang/cs/Org.Apache.REEF.Wake.Tests/RemoteManagerTest.cs
index f24eb29..c0ffa11 100644
--- a/lang/cs/Org.Apache.REEF.Wake.Tests/RemoteManagerTest.cs
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/RemoteManagerTest.cs
@@ -42,8 +42,8 @@ namespace Org.Apache.REEF.Wake.Tests
BlockingCollection<string> queue = new BlockingCollection<string>();
List<string> events = new List<string>();
- using (var remoteManager1 = _remoteManagerFactory.GetInstance(listeningAddress, 0, new StringCodec()))
- using (var remoteManager2 = _remoteManagerFactory.GetInstance(listeningAddress, 0, new StringCodec()))
+ using (var remoteManager1 = GetRemoteManager())
+ using (var remoteManager2 = GetRemoteManager())
{
var observer = Observer.Create<string>(queue.Add);
IPEndPoint endpoint1 = new IPEndPoint(listeningAddress, 0);
@@ -65,14 +65,13 @@ namespace Org.Apache.REEF.Wake.Tests
[TestMethod]
public void TestOneWayCommunicationClientOnly()
{
- int listeningPort = NetworkUtils.GenerateRandomPort(6000, 7000);
IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
BlockingCollection<string> queue = new BlockingCollection<string>();
List<string> events = new List<string>();
using (var remoteManager1 = _remoteManagerFactory.GetInstance(new StringCodec()))
- using (var remoteManager2 = _remoteManagerFactory.GetInstance(listeningAddress, listeningPort, new StringCodec()))
+ using (var remoteManager2 = _remoteManagerFactory.GetInstance(listeningAddress, new StringCodec()))
{
IPEndPoint remoteEndpoint = new IPEndPoint(listeningAddress, 0);
var observer = Observer.Create<string>(queue.Add);
@@ -101,8 +100,8 @@ namespace Org.Apache.REEF.Wake.Tests
List<string> events1 = new List<string>();
List<string> events2 = new List<string>();
- using (var remoteManager1 = _remoteManagerFactory.GetInstance(listeningAddress, 0, new StringCodec()))
- using (var remoteManager2 = _remoteManagerFactory.GetInstance(listeningAddress, 0, new StringCodec()))
+ using (var remoteManager1 = GetRemoteManager())
+ using (var remoteManager2 = GetRemoteManager())
{
// Register observers for remote manager 1 and remote manager 2
var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
@@ -146,9 +145,9 @@ namespace Org.Apache.REEF.Wake.Tests
BlockingCollection<string> queue = new BlockingCollection<string>();
List<string> events = new List<string>();
- using (var remoteManager1 = _remoteManagerFactory.GetInstance(listeningAddress, 0, new StringCodec()))
- using (var remoteManager2 = _remoteManagerFactory.GetInstance(listeningAddress, 0, new StringCodec()))
- using (var remoteManager3 = _remoteManagerFactory.GetInstance(listeningAddress, 0, new StringCodec()))
+ using (var remoteManager1 = GetRemoteManager())
+ using (var remoteManager2 = GetRemoteManager())
+ using (var remoteManager3 = GetRemoteManager())
{
var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
var observer = Observer.Create<string>(queue.Add);
@@ -184,9 +183,9 @@ namespace Org.Apache.REEF.Wake.Tests
List<string> events2 = new List<string>();
List<string> events3 = new List<string>();
- using (var remoteManager1 = _remoteManagerFactory.GetInstance(listeningAddress, 0, new StringCodec()))
- using (var remoteManager2 = _remoteManagerFactory.GetInstance(listeningAddress, 0, new StringCodec()))
- using (var remoteManager3 = _remoteManagerFactory.GetInstance(listeningAddress, 0, new StringCodec()))
+ using (var remoteManager1 = GetRemoteManager())
+ using (var remoteManager2 = GetRemoteManager())
+ using (var remoteManager3 = GetRemoteManager())
{
var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
@@ -244,8 +243,8 @@ namespace Org.Apache.REEF.Wake.Tests
BlockingCollection<string> queue = new BlockingCollection<string>();
List<string> events = new List<string>();
- using (var remoteManager1 = _remoteManagerFactory.GetInstance(listeningAddress, 0, new StringCodec()))
- using (var remoteManager2 = _remoteManagerFactory.GetInstance(listeningAddress, 0, new StringCodec()))
+ using (var remoteManager1 = GetRemoteManager())
+ using (var remoteManager2 = GetRemoteManager())
{
// Register handler for when remote manager 2 receives events; respond
// with an ack
@@ -285,8 +284,8 @@ namespace Org.Apache.REEF.Wake.Tests
BlockingCollection<string> queue = new BlockingCollection<string>();
List<string> events = new List<string>();
- using (var remoteManager1 = _remoteManagerFactory.GetInstance(listeningAddress, 0, new StringCodec()))
- using (var remoteManager2 = _remoteManagerFactory.GetInstance(listeningAddress, 0, new StringCodec()))
+ using (var remoteManager1 = GetRemoteManager())
+ using (var remoteManager2 = GetRemoteManager())
{
// RemoteManager2 listens and records events of type IRemoteEvent<string>
var observer = Observer.Create<IRemoteMessage<string>>(message => queue.Add(message.Message));
@@ -314,8 +313,8 @@ namespace Org.Apache.REEF.Wake.Tests
BlockingCollection<string> queue = new BlockingCollection<string>();
List<string> events = new List<string>();
- using (var remoteManager1 = _remoteManagerFactory.GetInstance(listeningAddress, 0, new StringCodec()))
- using (var remoteManager2 = _remoteManagerFactory.GetInstance(listeningAddress, 0, new StringCodec()))
+ using (var remoteManager1 = GetRemoteManager())
+ using (var remoteManager2 = GetRemoteManager())
{
var observer = Observer.Create<string>(queue.Add);
IPEndPoint endpoint1 = new IPEndPoint(listeningAddress, 0);
@@ -337,5 +336,11 @@ namespace Org.Apache.REEF.Wake.Tests
Assert.AreEqual(4, events.Count);
}
+
+ private IRemoteManager<string> GetRemoteManager()
+ {
+ IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
+ return _remoteManagerFactory.GetInstance(listeningAddress, new StringCodec());
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/fffee854/lang/cs/Org.Apache.REEF.Wake.Tests/TransportTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/TransportTest.cs b/lang/cs/Org.Apache.REEF.Wake.Tests/TransportTest.cs
index 3e67e0d..c1c65b2 100644
--- a/lang/cs/Org.Apache.REEF.Wake.Tests/TransportTest.cs
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/TransportTest.cs
@@ -23,8 +23,10 @@ using System.Net;
using System.Reactive;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.REEF.Tang.Implementations.Tang;
using Org.Apache.REEF.Wake.Remote;
using Org.Apache.REEF.Wake.Remote.Impl;
+using Org.Apache.REEF.Wake.Remote.Parameters;
using Org.Apache.REEF.Wake.Util;
namespace Org.Apache.REEF.Wake.Tests
@@ -32,23 +34,25 @@ namespace Org.Apache.REEF.Wake.Tests
[TestClass]
public class TransportTest
{
+ private readonly IPAddress _localIpAddress = IPAddress.Parse("127.0.0.1");
+ private readonly ITcpPortProvider _tcpPortProvider = GetTcpProvider(8900, 8940);
[TestMethod]
public void TestTransportServer()
{
ICodec<string> codec = new StringCodec();
- int port = NetworkUtils.GenerateRandomPort(6000, 7000);
BlockingCollection<string> queue = new BlockingCollection<string>();
List<string> events = new List<string>();
- IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, port);
+
+ IPEndPoint endpoint = new IPEndPoint(_localIpAddress, 0);
var remoteHandler = Observer.Create<TransportEvent<string>>(tEvent => queue.Add(tEvent.Data));
- using (var server = new TransportServer<string>(endpoint, remoteHandler, codec))
+ using (var server = new TransportServer<string>(endpoint, remoteHandler, codec, _tcpPortProvider))
{
server.Run();
- IPEndPoint remoteEndpoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), port);
+ IPEndPoint remoteEndpoint = new IPEndPoint(_localIpAddress, server.LocalEndpoint.Port);
using (var client = new TransportClient<string>(remoteEndpoint, codec))
{
client.Send("Hello");
@@ -68,19 +72,18 @@ namespace Org.Apache.REEF.Wake.Tests
public void TestTransportServerEvent()
{
ICodec<TestEvent> codec = new TestEventCodec();
- int port = NetworkUtils.GenerateRandomPort(6000, 7000);
BlockingCollection<TestEvent> queue = new BlockingCollection<TestEvent>();
List<TestEvent> events = new List<TestEvent>();
- IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, port);
+ IPEndPoint endpoint = new IPEndPoint(_localIpAddress, 0);
var remoteHandler = Observer.Create<TransportEvent<TestEvent>>(tEvent => queue.Add(tEvent.Data));
- using (var server = new TransportServer<TestEvent>(endpoint, remoteHandler, codec))
+ using (var server = new TransportServer<TestEvent>(endpoint, remoteHandler, codec, _tcpPortProvider))
{
server.Run();
- IPEndPoint remoteEndpoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), port);
+ IPEndPoint remoteEndpoint = new IPEndPoint(_localIpAddress, server.LocalEndpoint.Port);
using (var client = new TransportClient<TestEvent>(remoteEndpoint, codec))
{
client.Send(new TestEvent("Hello"));
@@ -100,8 +103,7 @@ namespace Org.Apache.REEF.Wake.Tests
public void TestTransportSenderStage()
{
ICodec<string> codec = new StringCodec();
- int port = NetworkUtils.GenerateRandomPort(6000, 7000);
- IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, port);
+ IPEndPoint endpoint = new IPEndPoint(_localIpAddress, 0);
List<string> events = new List<string>();
BlockingCollection<string> queue = new BlockingCollection<string>();
@@ -109,12 +111,12 @@ namespace Org.Apache.REEF.Wake.Tests
// Server echoes the message back to the client
var remoteHandler = Observer.Create<TransportEvent<string>>(tEvent => tEvent.Link.Write(tEvent.Data));
- using (TransportServer<string> server = new TransportServer<string>(endpoint, remoteHandler, codec))
+ using (TransportServer<string> server = new TransportServer<string>(endpoint, remoteHandler, codec, _tcpPortProvider))
{
server.Run();
var clientHandler = Observer.Create<TransportEvent<string>>(tEvent => queue.Add(tEvent.Data));
- IPEndPoint remoteEndpoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), port);
+ IPEndPoint remoteEndpoint = new IPEndPoint(_localIpAddress, server.LocalEndpoint.Port);
using (var client = new TransportClient<string>(remoteEndpoint, codec, clientHandler))
{
client.Send("Hello");
@@ -134,8 +136,7 @@ namespace Org.Apache.REEF.Wake.Tests
public void TestRaceCondition()
{
ICodec<string> codec = new StringCodec();
- int port = NetworkUtils.GenerateRandomPort(6000, 7000);
-
+ var port = 0;
BlockingCollection<string> queue = new BlockingCollection<string>();
List<string> events = new List<string>();
int numEventsExpected = 150;
@@ -143,7 +144,7 @@ namespace Org.Apache.REEF.Wake.Tests
IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, port);
var remoteHandler = Observer.Create<TransportEvent<string>>(tEvent => queue.Add(tEvent.Data));
- using (var server = new TransportServer<string>(endpoint, remoteHandler, codec))
+ using (var server = new TransportServer<string>(endpoint, remoteHandler, codec, _tcpPortProvider))
{
server.Run();
@@ -151,7 +152,7 @@ namespace Org.Apache.REEF.Wake.Tests
{
Task.Run(() =>
{
- IPEndPoint remoteEndpoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), port);
+ IPEndPoint remoteEndpoint = new IPEndPoint(_localIpAddress, server.LocalEndpoint.Port);
using (var client = new TransportClient<string>(remoteEndpoint, codec))
{
client.Send("Hello");
@@ -197,5 +198,17 @@ namespace Org.Apache.REEF.Wake.Tests
return new TestEvent(new StringCodec().Decode(data));
}
}
+
+
+ private static ITcpPortProvider GetTcpProvider(int portRangeStart, int portRangeEnd)
+ {
+ var configuration = TangFactory.GetTang().NewConfigurationBuilder()
+ .BindImplementation<ITcpPortProvider, TcpPortProvider>()
+ .BindIntNamedParam<TcpPortRangeStart>(portRangeStart.ToString())
+ .BindIntNamedParam<TcpPortRangeCount>((portRangeEnd - portRangeStart + 1).ToString())
+ .Build();
+ return TangFactory.GetTang().NewInjector(configuration).GetInstance<ITcpPortProvider>();
+ }
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/fffee854/lang/cs/Org.Apache.REEF.Wake.Tests/WritableRemoteManagerTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/WritableRemoteManagerTest.cs b/lang/cs/Org.Apache.REEF.Wake.Tests/WritableRemoteManagerTest.cs
index 6f7baf9..80bb78b 100644
--- a/lang/cs/Org.Apache.REEF.Wake.Tests/WritableRemoteManagerTest.cs
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/WritableRemoteManagerTest.cs
@@ -79,7 +79,7 @@ namespace Org.Apache.REEF.Wake.Tests
[TestMethod]
public void TestWritableOneWayCommunicationClientOnly()
{
- int listeningPort = NetworkUtils.GenerateRandomPort(6000, 7000);
+ int listeningPort = NetworkUtils.GenerateRandomPort(8900, 8940);
IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>();
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/fffee854/lang/cs/Org.Apache.REEF.Wake.Tests/WritableTransportTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/WritableTransportTest.cs b/lang/cs/Org.Apache.REEF.Wake.Tests/WritableTransportTest.cs
index 03de24e..914a2aa 100644
--- a/lang/cs/Org.Apache.REEF.Wake.Tests/WritableTransportTest.cs
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/WritableTransportTest.cs
@@ -26,8 +26,11 @@ using System.Reactive;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
using Org.Apache.REEF.Wake.Remote;
using Org.Apache.REEF.Wake.Remote.Impl;
+using Org.Apache.REEF.Wake.Remote.Parameters;
using Org.Apache.REEF.Wake.Util;
namespace Org.Apache.REEF.Wake.Tests
@@ -40,6 +43,8 @@ namespace Org.Apache.REEF.Wake.Tests
[Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)]
public class WritableTransportTest
{
+ private readonly ITcpPortProvider _tcpPortProvider = GetTcpProvider(8900, 8940);
+
/// <summary>
/// Tests whether WritableTransportServer receives
/// string messages from WritableTransportClient
@@ -47,19 +52,17 @@ namespace Org.Apache.REEF.Wake.Tests
[TestMethod]
public void TestWritableTransportServer()
{
- int port = NetworkUtils.GenerateRandomPort(6000, 7000);
-
BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>();
List<string> events = new List<string>();
- IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, port);
+ IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, 0);
var remoteHandler = Observer.Create<TransportEvent<WritableString>>(tEvent => queue.Add(tEvent.Data));
- using (var server = new WritableTransportServer<WritableString>(endpoint, remoteHandler))
+ using (var server = new WritableTransportServer<WritableString>(endpoint, remoteHandler, _tcpPortProvider))
{
server.Run();
- IPEndPoint remoteEndpoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), port);
+ IPEndPoint remoteEndpoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), server.LocalEndpoint.Port);
using (var client = new WritableTransportClient<WritableString>(remoteEndpoint))
{
client.Send(new WritableString("Hello"));
@@ -85,8 +88,8 @@ namespace Org.Apache.REEF.Wake.Tests
[TestMethod]
public void TestWritableTransportSenderStage()
{
- int port = NetworkUtils.GenerateRandomPort(6000, 7000);
- IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, port);
+
+ IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, 0);
List<string> events = new List<string>();
BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>();
@@ -94,12 +97,12 @@ namespace Org.Apache.REEF.Wake.Tests
// Server echoes the message back to the client
var remoteHandler = Observer.Create<TransportEvent<WritableString>>(tEvent => tEvent.Link.Write(tEvent.Data));
- using (WritableTransportServer<WritableString> server = new WritableTransportServer<WritableString>(endpoint, remoteHandler))
+ using (var server = new WritableTransportServer<WritableString>(endpoint, remoteHandler, _tcpPortProvider))
{
server.Run();
var clientHandler = Observer.Create<TransportEvent<WritableString>>(tEvent => queue.Add(tEvent.Data));
- IPEndPoint remoteEndpoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), port);
+ IPEndPoint remoteEndpoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), server.LocalEndpoint.Port);
using (var client = new WritableTransportClient<WritableString>(remoteEndpoint, clientHandler))
{
client.Send(new WritableString("Hello"));
@@ -126,16 +129,14 @@ namespace Org.Apache.REEF.Wake.Tests
[TestMethod]
public void TestWritableRaceCondition()
{
- int port = NetworkUtils.GenerateRandomPort(6000, 7000);
-
BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>();
List<string> events = new List<string>();
int numEventsExpected = 150;
- IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, port);
+ IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, 0);
var remoteHandler = Observer.Create<TransportEvent<WritableString>>(tEvent => queue.Add(tEvent.Data));
- using (var server = new WritableTransportServer<WritableString>(endpoint, remoteHandler))
+ using (var server = new WritableTransportServer<WritableString>(endpoint, remoteHandler, _tcpPortProvider))
{
server.Run();
@@ -143,7 +144,7 @@ namespace Org.Apache.REEF.Wake.Tests
{
Task.Run(() =>
{
- IPEndPoint remoteEndpoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), port);
+ IPEndPoint remoteEndpoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), server.LocalEndpoint.Port);
using (var client = new WritableTransportClient<WritableString>(remoteEndpoint))
{
client.Send(new WritableString("Hello"));
@@ -162,5 +163,15 @@ namespace Org.Apache.REEF.Wake.Tests
Assert.AreEqual(numEventsExpected, events.Count);
}
+
+ private static ITcpPortProvider GetTcpProvider(int portRangeStart, int portRangeEnd)
+ {
+ var configuration = TangFactory.GetTang().NewConfigurationBuilder()
+ .BindImplementation<ITcpPortProvider, TcpPortProvider>()
+ .BindIntNamedParam<TcpPortRangeStart>(portRangeStart.ToString())
+ .BindIntNamedParam<TcpPortRangeCount>((portRangeEnd - portRangeStart + 1).ToString())
+ .Build();
+ return TangFactory.GetTang().NewInjector(configuration).GetInstance<ITcpPortProvider>();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/fffee854/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj b/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
index a62d524..53ffd65 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
+++ b/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
@@ -62,6 +62,11 @@ under the License.
<Compile Include="IObserverFactory.cs" />
<Compile Include="IStage.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
+ <Compile Include="Remote\ITcpPortProvider.cs" />
+ <Compile Include="Remote\Parameters\TcpPortRangeCount.cs" />
+ <Compile Include="Remote\Parameters\TcpPortRangeSeed.cs" />
+ <Compile Include="Remote\Parameters\TcpPortRangeStart.cs" />
+ <Compile Include="Remote\Parameters\TcpPortRangeTryCount.cs" />
<Compile Include="Remote\IDataReader.cs" />
<Compile Include="Remote\IDataWriter.cs" />
<Compile Include="Remote\Impl\StreamDataReader.cs" />
@@ -114,6 +119,7 @@ under the License.
<Compile Include="Remote\Proto\WakeRemoteProtos.cs" />
<Compile Include="Remote\RemoteConfiguration.cs" />
<Compile Include="Remote\RemoteRuntimeException.cs" />
+ <Compile Include="Remote\TcpPortProvider.cs" />
<Compile Include="Remote\TypeCache.cs" />
<Compile Include="RX\AbstractObserver.cs" />
<Compile Include="RX\AbstractRxStage.cs" />
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/fffee854/lang/cs/Org.Apache.REEF.Wake/Remote/IRemoteManagerFactory.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/IRemoteManagerFactory.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/IRemoteManagerFactory.cs
index 7d8041b..36a1adc 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/IRemoteManagerFactory.cs
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/IRemoteManagerFactory.cs
@@ -24,7 +24,7 @@ using Org.Apache.REEF.Wake.Impl;
namespace Org.Apache.REEF.Wake.Remote
{
/// <summary>
- /// Creates new intsances of IRemoteManager.
+ /// Creates new instances of IRemoteManager.
/// </summary>
[DefaultImplementation(typeof(DefaultRemoteManagerFactory))]
public interface IRemoteManagerFactory
@@ -39,6 +39,15 @@ namespace Org.Apache.REEF.Wake.Remote
IRemoteManager<T> GetInstance<T>(IPAddress localAddress, int port, ICodec<T> codec);
/// <summary>
+ /// Constructs a DefaultRemoteManager listening on the specified address and any
+ /// available port.
+ /// </summary>
+ /// <param name="localAddress">The address to listen on</param>
+ /// <param name="codec">The codec used for serializing messages</param>
+ IRemoteManager<T> GetInstance<T>(IPAddress localAddress, ICodec<T> codec);
+
+
+ /// <summary>
/// Constructs a DefaultRemoteManager. Does not listen for incoming messages.
/// </summary>
/// <param name="codec">The codec used for serializing messages</param>
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/fffee854/lang/cs/Org.Apache.REEF.Wake/Remote/ITcpPortProvider.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/ITcpPortProvider.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/ITcpPortProvider.cs
new file mode 100644
index 0000000..8783f01
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/ITcpPortProvider.cs
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Collections.Generic;
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.Wake.Remote
+{
+ /// <summary>
+ /// Provides port numbers for tcp listeners
+ /// </summary>
+ [DefaultImplementation(typeof(TcpPortProvider))]
+ public interface ITcpPortProvider : IEnumerable<int>
+ {
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/fffee854/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManager.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManager.cs
index 210ebcf..de577d1 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManager.cs
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManager.cs
@@ -20,6 +20,7 @@
using System;
using System.Collections.Generic;
using System.Net;
+using Org.Apache.REEF.Tang.Annotations;
using Org.Apache.REEF.Utilities.Logging;
using Org.Apache.REEF.Wake.Util;
@@ -43,8 +44,10 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
/// </summary>
/// <param name="localAddress">The address to listen on</param>
/// <param name="codec">The codec used for serializing messages</param>
+ /// <param name="tcpPortProvider">provides port numbers to listen</param>
[Obsolete("Use IRemoteManagerFactory.GetInstance() instead.", false)]
- public DefaultRemoteManager(IPAddress localAddress, ICodec<T> codec) : this(localAddress, 0, codec)
+ public DefaultRemoteManager(IPAddress localAddress, ICodec<T> codec, ITcpPortProvider tcpPortProvider) :
+ this(localAddress, 0, codec, tcpPortProvider)
{
}
@@ -53,8 +56,9 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
/// </summary>
/// <param name="localEndpoint">The endpoint to listen on</param>
/// <param name="codec">The codec used for serializing messages</param>
+ /// <param name="tcpPortProvider">provides port numbers to listen</param>
[Obsolete("Use IRemoteManagerFactory.GetInstance() instead.", false)]
- public DefaultRemoteManager(IPEndPoint localEndpoint, ICodec<T> codec)
+ public DefaultRemoteManager(IPEndPoint localEndpoint, ICodec<T> codec, ITcpPortProvider tcpPortProvider)
{
if (localEndpoint == null)
{
@@ -74,7 +78,8 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
_cachedClients = new Dictionary<IPEndPoint, ProxyObserver>();
// Begin to listen for incoming messages
- _server = new TransportServer<IRemoteEvent<T>>(localEndpoint, _observerContainer, _codec);
+ _server = new TransportServer<IRemoteEvent<T>>(localEndpoint, _observerContainer, _codec,
+ tcpPortProvider);
_server.Run();
LocalEndpoint = _server.LocalEndpoint;
@@ -88,8 +93,9 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
/// <param name="localAddress">The address to listen on</param>
/// <param name="port">The port to listen on</param>
/// <param name="codec">The codec used for serializing messages</param>
+ /// <param name="tcpPortProvider">provides port numbers to listen</param>
[Obsolete("Use IRemoteManagerFactory.GetInstance() instead.", false)]
- public DefaultRemoteManager(IPAddress localAddress, int port, ICodec<T> codec)
+ public DefaultRemoteManager(IPAddress localAddress, int port, ICodec<T> codec, ITcpPortProvider tcpPortProvider)
{
if (localAddress == null)
{
@@ -111,7 +117,8 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
IPEndPoint localEndpoint = new IPEndPoint(localAddress, port);
// Begin to listen for incoming messages
- _server = new TransportServer<IRemoteEvent<T>>(localEndpoint, _observerContainer, _codec);
+ _server = new TransportServer<IRemoteEvent<T>>(localEndpoint, _observerContainer, _codec,
+ tcpPortProvider);
_server.Run();
LocalEndpoint = _server.LocalEndpoint;
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/fffee854/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManagerFactory.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManagerFactory.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManagerFactory.cs
index 38a020f..54728fc 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManagerFactory.cs
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManagerFactory.cs
@@ -29,16 +29,26 @@ namespace Org.Apache.REEF.Wake.Impl
/// </summary>
internal sealed class DefaultRemoteManagerFactory : IRemoteManagerFactory
{
+ private readonly ITcpPortProvider _tcpPortProvider;
[Inject]
- private DefaultRemoteManagerFactory()
+ private DefaultRemoteManagerFactory(ITcpPortProvider tcpPortProvider)
{
+ _tcpPortProvider = tcpPortProvider;
}
public IRemoteManager<T> GetInstance<T>(IPAddress localAddress, int port, ICodec<T> codec)
{
#pragma warning disable 618
// This is the one place allowed to call this constructor. Hence, disabling the warning is OK.
- return new DefaultRemoteManager<T>(localAddress, port, codec);
+ return new DefaultRemoteManager<T>(localAddress, port, codec, _tcpPortProvider);
+#pragma warning restore 618
+ }
+
+ public IRemoteManager<T> GetInstance<T>(IPAddress localAddress, ICodec<T> codec)
+ {
+#pragma warning disable 618
+ // This is the one place allowed to call this constructor. Hence, disabling the warning is OK.
+ return new DefaultRemoteManager<T>(localAddress, 0, codec, _tcpPortProvider);
#pragma warning restore 618
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/fffee854/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TransportServer.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TransportServer.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TransportServer.cs
index 743bac5..8cd350e 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TransportServer.cs
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TransportServer.cs
@@ -22,6 +22,7 @@ using System.Net;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
+using Org.Apache.REEF.Utilities.Diagnostics;
using Org.Apache.REEF.Utilities.Logging;
using Org.Apache.REEF.Wake.Util;
@@ -34,9 +35,10 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
{
private static readonly Logger LOGGER = Logger.GetLogger(typeof(TransportServer<>));
- private readonly TcpListener _listener;
+ private TcpListener _listener;
private readonly CancellationTokenSource _cancellationSource;
private readonly IObserver<TransportEvent<T>> _remoteObserver;
+ private readonly ITcpPortProvider _tcpPortProvider;
private readonly ICodec<T> _codec;
private bool _disposed;
private Task _serverTask;
@@ -46,33 +48,22 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
/// Listens on the specified remote endpoint. When it recieves a remote
/// event, it will envoke the specified remote handler.
/// </summary>
- /// <param name="port">Port to listen on</param>
- /// <param name="remoteHandler">The handler to invoke when receiving incoming
- /// remote messages</param>
- /// <param name="codec">The codec to encode/decode"</param>
- public TransportServer(int port, IObserver<TransportEvent<T>> remoteHandler, ICodec<T> codec)
- : this(new IPEndPoint(NetworkUtils.LocalIPAddress, port), remoteHandler, codec)
- {
- }
-
- /// <summary>
- /// Constructs a TransportServer to listen for remote events.
- /// Listens on the specified remote endpoint. When it recieves a remote
- /// event, it will envoke the specified remote handler.
- /// </summary>
/// <param name="localEndpoint">Endpoint to listen on</param>
/// <param name="remoteHandler">The handler to invoke when receiving incoming
/// remote messages</param>
/// <param name="codec">The codec to encode/decode"</param>
+ /// <param name="tcpPortProvider">provides port numbers to listen</param>
public TransportServer(IPEndPoint localEndpoint,
IObserver<TransportEvent<T>> remoteHandler,
- ICodec<T> codec)
+ ICodec<T> codec,
+ ITcpPortProvider tcpPortProvider)
{
_listener = new TcpListener(localEndpoint.Address, localEndpoint.Port);
_remoteObserver = remoteHandler;
_cancellationSource = new CancellationTokenSource();
_cancellationSource.Token.ThrowIfCancellationRequested();
_codec = codec;
+ _tcpPortProvider = tcpPortProvider;
_disposed = false;
}
@@ -89,10 +80,45 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
/// </summary>
public void Run()
{
- _listener.Start();
+ if (LocalEndpoint.Port == 0)
+ {
+ FindAPortAndStartListener();
+ }
+ else
+ {
+ _listener.Start();
+ }
+
_serverTask = Task.Run(() => StartServer());
}
+ private void FindAPortAndStartListener()
+ {
+ var foundAPort = false;
+ var exception = new SocketException((int)SocketError.AddressAlreadyInUse);
+ for (var enumerator = _tcpPortProvider.GetEnumerator();
+ !foundAPort && enumerator.MoveNext();
+ )
+ {
+ _listener = new TcpListener(LocalEndpoint.Address, enumerator.Current);
+ try
+ {
+ _listener.Start();
+ foundAPort = true;
+ }
+ catch (SocketException e)
+ {
+ exception = e;
+ }
+ }
+ if (!foundAPort)
+ {
+ Exceptions.Throw(exception, "Could not find a port to listen on", LOGGER);
+ }
+ LOGGER.Log(Level.Info,
+ String.Format("Listening on {0}", _listener.LocalEndpoint.ToString()));
+ }
+
/// <summary>
/// Close the TransportServer and all open connections
/// </summary>
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/fffee854/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManager.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManager.cs
index 285db71..0a9ead3 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManager.cs
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManager.cs
@@ -45,7 +45,7 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
/// <param name="localAddress">The address to listen on</param>
/// <param name="port">The port to listen on</param>
[Obsolete("Use IRemoteManagerFactory.GetInstance() instead.", false)]
- public WritableRemoteManager(IPAddress localAddress, int port)
+ public WritableRemoteManager(IPAddress localAddress, int port, ITcpPortProvider tcpPortProvider)
{
if (localAddress == null)
{
@@ -62,7 +62,7 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
IPEndPoint localEndpoint = new IPEndPoint(localAddress, port);
// Begin to listen for incoming messages
- _server = new WritableTransportServer<IWritableRemoteEvent<T>>(localEndpoint, _observerContainer);
+ _server = new WritableTransportServer<IWritableRemoteEvent<T>>(localEndpoint, _observerContainer, tcpPortProvider);
_server.Run();
LocalEndpoint = _server.LocalEndpoint;
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/fffee854/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManagerFactory.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManagerFactory.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManagerFactory.cs
index 6d3c4ad..4beb844 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManagerFactory.cs
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManagerFactory.cs
@@ -31,16 +31,18 @@ namespace Org.Apache.REEF.Wake.Impl
[Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)]
public sealed class WritableRemoteManagerFactory
{
+ private readonly ITcpPortProvider _tcpPortProvider;
[Inject]
- private WritableRemoteManagerFactory()
+ private WritableRemoteManagerFactory(ITcpPortProvider tcpPortProvider)
{
+ _tcpPortProvider = tcpPortProvider;
}
public IRemoteManager<T> GetInstance<T>(IPAddress localAddress, int port) where T : IWritable
{
#pragma warning disable 618
// This is the one place allowed to call this constructor. Hence, disabling the warning is OK.
- return new WritableRemoteManager<T>(localAddress, port);
+ return new WritableRemoteManager<T>(localAddress, port, _tcpPortProvider);
#pragma warning disable 618
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/fffee854/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportServer.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportServer.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportServer.cs
index 05a520d..90cfdd7 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportServer.cs
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportServer.cs
@@ -22,6 +22,7 @@ using System.Net;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
+using Org.Apache.REEF.Utilities.Diagnostics;
using Org.Apache.REEF.Utilities.Logging;
using Org.Apache.REEF.Wake.Util;
@@ -36,9 +37,10 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
{
private static readonly Logger LOGGER = Logger.GetLogger(typeof (TransportServer<>));
- private readonly TcpListener _listener;
+ private TcpListener _listener;
private readonly CancellationTokenSource _cancellationSource;
private readonly IObserver<TransportEvent<T>> _remoteObserver;
+ private readonly ITcpPortProvider _tcpPortProvider;
private bool _disposed;
private Task _serverTask;
@@ -50,8 +52,9 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
/// <param name="port">Port to listen on</param>
/// <param name="remoteHandler">The handler to invoke when receiving incoming
/// remote messages</param>
- public WritableTransportServer(int port, IObserver<TransportEvent<T>> remoteHandler)
- : this(new IPEndPoint(NetworkUtils.LocalIPAddress, port), remoteHandler)
+ /// <param name="tcpPortProvider">Find port numbers if listenport is 0</param>
+ public WritableTransportServer(int port, IObserver<TransportEvent<T>> remoteHandler, ITcpPortProvider tcpPortProvider)
+ : this(new IPEndPoint(NetworkUtils.LocalIPAddress, port), remoteHandler, tcpPortProvider)
{
}
@@ -63,11 +66,15 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
/// <param name="localEndpoint">Endpoint to listen on</param>
/// <param name="remoteHandler">The handler to invoke when receiving incoming
/// remote messages</param>
- public WritableTransportServer(IPEndPoint localEndpoint,
- IObserver<TransportEvent<T>> remoteHandler)
+ /// <param name="tcpPortProvider">Find port numbers if listenport is 0</param>
+ public WritableTransportServer(
+ IPEndPoint localEndpoint,
+ IObserver<TransportEvent<T>> remoteHandler,
+ ITcpPortProvider tcpPortProvider)
{
_listener = new TcpListener(localEndpoint.Address, localEndpoint.Port);
_remoteObserver = remoteHandler;
+ _tcpPortProvider = tcpPortProvider;
_cancellationSource = new CancellationTokenSource();
_cancellationSource.Token.ThrowIfCancellationRequested();
_disposed = false;
@@ -86,10 +93,46 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
/// </summary>
public void Run()
{
- _listener.Start();
+ if (LocalEndpoint.Port == 0)
+ {
+ FindAPortAndStartListener();
+ }
+ else
+ {
+ _listener.Start();
+ }
+
_serverTask = Task.Run(() => StartServer());
}
+ private void FindAPortAndStartListener()
+ {
+ var foundAPort = false;
+ var exception = new SocketException((int)SocketError.AddressAlreadyInUse);
+ for (var enumerator = _tcpPortProvider.GetEnumerator();
+ !foundAPort && enumerator.MoveNext();
+ )
+ {
+ _listener = new TcpListener(LocalEndpoint.Address, enumerator.Current);
+ try
+ {
+ _listener.Start();
+ foundAPort = true;
+ }
+ catch (SocketException e)
+ {
+ exception = e;
+ }
+ }
+ if (!foundAPort)
+ {
+ Exceptions.Throw(exception, "Could not find a port to listen on", LOGGER);
+ }
+ LOGGER.Log(Level.Info,
+ String.Format("Listening on {0}", _listener.LocalEndpoint.ToString()));
+ }
+
+
/// <summary>
/// Close the TransportServer and all open connections
/// </summary>
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/fffee854/lang/cs/Org.Apache.REEF.Wake/Remote/Parameters/TcpPortRangeCount.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Parameters/TcpPortRangeCount.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Parameters/TcpPortRangeCount.cs
new file mode 100644
index 0000000..164e2bd
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Parameters/TcpPortRangeCount.cs
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.Wake.Remote.Parameters
+{
+ [NamedParameter(Documentation = "Port number count in the range for listening on tcp ports", DefaultValue = "1000")
+ ]
+ public class TcpPortRangeCount : Name<int>
+ {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/fffee854/lang/cs/Org.Apache.REEF.Wake/Remote/Parameters/TcpPortRangeSeed.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Parameters/TcpPortRangeSeed.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Parameters/TcpPortRangeSeed.cs
new file mode 100644
index 0000000..f60f169
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Parameters/TcpPortRangeSeed.cs
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.Wake.Remote.Parameters
+{
+ [NamedParameter(Documentation = "Seed for the random port number generator", DefaultValue = "0")]
+ public class TcpPortRangeSeed: Name<int>
+ {
+ }
+}