You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/02/05 22:05:50 UTC
[29/51] [partial] incubator-reef git commit: [REEF-131] Towards the
new .Net project structure This is to change .Net project structure for Tang,
Wake, REEF utilities, Common and Driver:
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Utilities/Logging/Logger.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Utilities/Logging/Logger.cs b/lang/cs/Org.Apache.REEF.Utilities/Logging/Logger.cs
new file mode 100644
index 0000000..3de6e22
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Utilities/Logging/Logger.cs
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Globalization;
+
+namespace Org.Apache.REEF.Utilities.Logging
+{
+ public class Logger
+ {
+ private static readonly string[] LogLevel = new string[]
+ {
+ "OFF",
+ "ERROR",
+ "WARNING",
+ "START",
+ "EXIT",
+ "INFO",
+ "VERBOSE"
+ };
+
+ private static readonly Dictionary<Level, TraceEventType> EventTypes
+ = new Dictionary<Level, TraceEventType>()
+ {
+ { Level.Off, TraceEventType.Stop },
+ { Level.Error, TraceEventType.Error },
+ { Level.Warning, TraceEventType.Warning },
+ { Level.Start, TraceEventType.Start },
+ { Level.Stop, TraceEventType.Stop },
+ { Level.Info, TraceEventType.Information },
+ { Level.Verbose, TraceEventType.Verbose },
+ };
+
+ private static Level _customLevel = Level.Verbose;
+
+ private static List<TraceListener> _traceListeners;
+
+ private string _name;
+
+ private TraceSource _traceSource;
+
+ private Logger(string name)
+ {
+ _name = name;
+ _traceSource = new TraceSource(_name, SourceLevels.All);
+ CustcomLevel = _customLevel;
+ if (TraceListeners.Count == 0)
+ {
+ // before customized listener is added, we would need to log to console
+ _traceSource.Listeners.Add(new ConsoleTraceListener());
+ }
+ else
+ {
+ _traceSource.Listeners.Clear();
+ foreach (TraceListener listener in TraceListeners)
+ {
+ _traceSource.Listeners.Add(listener);
+ }
+ }
+ }
+
+ public static Level CustcomLevel
+ {
+ get
+ {
+ return _customLevel;
+ }
+
+ set
+ {
+ _customLevel = value;
+ }
+ }
+
+ public static List<TraceListener> TraceListeners
+ {
+ get
+ {
+ if (_traceListeners == null)
+ {
+ _traceListeners = new List<TraceListener>();
+ }
+ return _traceListeners;
+ }
+ }
+
+ public static void SetCustomLevel(Level customLevel)
+ {
+ _customLevel = customLevel;
+ }
+
+ public static void AddTraceListner(TraceListener listener)
+ {
+ TraceListeners.Add(listener);
+ }
+
+ public static Logger GetLogger(Type type)
+ {
+ return GetLogger(type.FullName);
+ }
+
+ public static Logger GetLogger(string name)
+ {
+ return new Logger(name);
+ }
+
+ /// <summary>
+ /// Log the message with the specified Log Level.
+ ///
+ /// If addtional arguments are passed, the message will be treated as
+ /// a format string. The format string and the additional arguments
+ /// will be formatted according to string.Format()
+ /// </summary>
+ /// <param name="level"></param>
+ /// <param name="formatStr"></param>
+ /// <param name="args"></param>
+ public void Log(Level level, string formatStr, params object[] args)
+ {
+ if (CustcomLevel >= level)
+ {
+ string msg = FormatMessage(formatStr, args);
+ string logMessage =
+ DateTime.Now.ToString("o", CultureInfo.InvariantCulture)
+ + " "
+ + System.Threading.Thread.CurrentThread.ManagedThreadId.ToString("D4", CultureInfo.InvariantCulture)
+ + Environment.NewLine + LogLevel[(int)level] + ": "
+ + msg;
+
+ _traceSource.TraceEvent(
+ EventTypes[level],
+ 0, // we don't use event id for now, but this can be useful for e2e logging later
+ logMessage);
+ }
+ }
+
+ public void Log(Level level, string msg, Exception exception)
+ {
+ string exceptionLog = string.Empty;
+ if (exception != null)
+ {
+ exceptionLog = string.Format(
+ CultureInfo.InvariantCulture,
+ "encountered error [{0}] with mesage [{1}] and stack trace [{2}]",
+ exception,
+ exception.Message,
+ exception.StackTrace);
+ }
+ Log(level, msg + exceptionLog);
+ }
+
+ public IDisposable LogFunction(string function, params object[] args)
+ {
+ return LogScope(function, args);
+ }
+
+ public IDisposable LogScope(string format, params object[] args)
+ {
+ return new LoggingScope(this, DateTime.Now + " " + format, args);
+ }
+
+ private string FormatMessage(string formatStr, params object[] args)
+ {
+ return args.Length > 0 ? string.Format(CultureInfo.CurrentCulture, formatStr, args) : formatStr;
+ }
+
+ /// <summary>
+ /// Represents a logging scope.
+ /// </summary>
+ /// <remarks>
+ /// A start log is written when an instance is created
+ /// and a stop trace is written when the instance is disposed.
+ /// </remarks>
+ private sealed class LoggingScope : IDisposable
+ {
+ private readonly Stopwatch _stopWatch;
+
+ private readonly Logger _logger;
+
+ private readonly string _content;
+
+ /// <summary>
+ /// Initializes a new instance of the LoggingScope class.
+ /// </summary>
+ /// <param name="logger"></param>
+ /// <param name="format"></param>
+ /// <param name="args"></param>
+ public LoggingScope(Logger logger, string format, params object[] args)
+ {
+ _logger = logger;
+
+ _stopWatch = Stopwatch.StartNew();
+
+ string content = args.Length > 0 ? string.Format(CultureInfo.InvariantCulture, format, args) : format;
+ _content = content;
+
+ _logger.Log(Level.Start, content);
+ }
+
+ /// <summary>
+ /// Logs the end of a scope.
+ /// </summary>
+ public void Dispose()
+ {
+ _logger.Log(Level.Stop, string.Format(CultureInfo.InvariantCulture, "{0}. Duration: [{1}].", _content, _stopWatch.Elapsed));
+ _stopWatch.Stop();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Utilities/NetUtilities.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Utilities/NetUtilities.cs b/lang/cs/Org.Apache.REEF.Utilities/NetUtilities.cs
new file mode 100644
index 0000000..7c9a92c
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Utilities/NetUtilities.cs
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Utilities.Diagnostics;
+using Org.Apache.REEF.Utilities.Logging;
+using System;
+using System.Net;
+
+namespace Org.Apache.REEF.Utilities
+{
+ public class NetUtilities
+ {
+ private static readonly Logger LOGGER = Logger.GetLogger(typeof(NetUtilities));
+
+ public static IPEndPoint ParseIpEndpoint(string ipWithPort)
+ {
+ string ip = ipWithPort.TrimStart().TrimEnd();
+ if (char.IsDigit(ip[0]))
+ {
+ ip = @"socket://" + ip;
+ }
+ Uri uri = new Uri(ip);
+ string driverAddress = uri.Host;
+ int driverCommunicationPort = uri.Port;
+ IPAddress ipAddress;
+ IPAddress.TryParse(driverAddress, out ipAddress);
+ if (ipAddress == null)
+ {
+ Exceptions.Throw(new FormatException("invalid format for ip: " + ipWithPort), LOGGER);
+ }
+
+ return new IPEndPoint(ipAddress, driverCommunicationPort);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Utilities/Optional.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Utilities/Optional.cs b/lang/cs/Org.Apache.REEF.Utilities/Optional.cs
new file mode 100644
index 0000000..11d95fa
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Utilities/Optional.cs
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Utilities
+{
+ [Serializable]
+ public sealed class Optional<T>
+ {
+ private readonly T _value;
+
+ private Optional(T value)
+ {
+ _value = value;
+ }
+
+ private Optional()
+ {
+ _value = default(T);
+ }
+
+ public T Value
+ {
+ get { return _value; }
+ }
+
+ public static Optional<T> Of(T value)
+ {
+ if (value == null)
+ {
+ Diagnostics.Exceptions.Throw(new ArgumentNullException("value", "Passed a null value. Use OfNullable() instead"), Logger.GetLogger(typeof(Optional<T>)));
+ }
+ return new Optional<T>(value);
+ }
+
+ public static Optional<T> Empty()
+ {
+ return new Optional<T>();
+ }
+
+ public static Optional<T> OfNullable(T value)
+ {
+ if (value == null)
+ {
+ return Empty();
+ }
+ else
+ {
+ return Of(value);
+ }
+ }
+
+ public T OrElse(T other)
+ {
+ if (IsPresent())
+ {
+ return Value;
+ }
+ else
+ {
+ return other;
+ }
+ }
+
+ public bool IsPresent()
+ {
+ return (_value != null);
+ }
+
+ public override bool Equals(object obj)
+ {
+ if (this == obj)
+ {
+ return true;
+ }
+ if (obj == null || obj.GetType() != this.GetType())
+ {
+ return false;
+ }
+ Optional<T> optional = (Optional<T>)obj;
+ if (_value != null ? !_value.Equals(optional.Value) : optional.Value != null)
+ {
+ return false;
+ }
+ return true;
+ }
+
+ public override int GetHashCode()
+ {
+ return _value != null ? _value.GetHashCode() : 0;
+ }
+
+ public override string ToString()
+ {
+ return "Optional{value=" + _value + "}";
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Utilities/Org.Apache.Reef.Utilities.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Utilities/Org.Apache.Reef.Utilities.csproj b/lang/cs/Org.Apache.REEF.Utilities/Org.Apache.Reef.Utilities.csproj
new file mode 100644
index 0000000..27899de
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Utilities/Org.Apache.Reef.Utilities.csproj
@@ -0,0 +1,116 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<Project ToolsVersion="12.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <Import Project="$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props" Condition="Exists('$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props')" />
+ <PropertyGroup>
+ <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
+ <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
+ <ProjectGuid>{79E7F89A-1DFB-45E1-8D43-D71A954AEB98}</ProjectGuid>
+ <OutputType>Library</OutputType>
+ <AppDesignerFolder>Properties</AppDesignerFolder>
+ <RootNamespace>Org.Apache.REEF.Utilities</RootNamespace>
+ <AssemblyName>Org.Apache.REEF.Utilities</AssemblyName>
+ <TargetFrameworkVersion>v4.5</TargetFrameworkVersion>
+ <FileAlignment>512</FileAlignment>
+ <RestorePackages>true</RestorePackages>
+ <SolutionDir Condition="$(SolutionDir) == '' Or $(SolutionDir) == '*Undefined*'">..\</SolutionDir>
+ </PropertyGroup>
+ <Import Project="$(SolutionDir)\Source\build.props" />
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
+ <PlatformTarget>AnyCPU</PlatformTarget>
+ <DebugSymbols>true</DebugSymbols>
+ <DebugType>full</DebugType>
+ <Optimize>false</Optimize>
+ <OutputPath>$(BinDir)\$(Platform)\$(Configuration)\$(RootNamespace)</OutputPath>
+ <DefineConstants>DEBUG;TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ <PlatformTarget>AnyCPU</PlatformTarget>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|x64' ">
+ <PlatformTarget>AnyCPU</PlatformTarget>
+ <DebugSymbols>true</DebugSymbols>
+ <DebugType>full</DebugType>
+ <Optimize>false</Optimize>
+ <OutputPath>$(BinDir)\$(Platform)\$(Configuration)\$(RootNamespace)</OutputPath>
+ <DefineConstants>DEBUG;TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ <PlatformTarget>AnyCPU</PlatformTarget>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
+ <PlatformTarget>AnyCPU</PlatformTarget>
+ <DebugType>pdbonly</DebugType>
+ <Optimize>true</Optimize>
+ <OutputPath>$(BinDir)\$(Platform)\$(Configuration)\$(RootNamespace)</OutputPath>
+ <DefineConstants>TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|x64' ">
+ <PlatformTarget>AnyCPU</PlatformTarget>
+ <DebugType>pdbonly</DebugType>
+ <Optimize>true</Optimize>
+ <OutputPath>$(BinDir)\$(Platform)\$(Configuration)\$(RootNamespace)</OutputPath>
+ <DefineConstants>TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <ItemGroup>
+ <Reference Include="Microsoft.Hadoop.Avro">
+ <HintPath>$(PackagesDir)\Microsoft.Hadoop.Avro.$(AvroVersion)\lib\net40\Microsoft.Hadoop.Avro.dll</HintPath>
+ </Reference>
+ <Reference Include="Newtonsoft.Json">
+ <HintPath>$(PackagesDir)\Newtonsoft.Json.$(NewtonsoftJsonVersion)\lib\net45\Newtonsoft.Json.dll</HintPath>
+ </Reference>
+ <Reference Include="System" />
+ <Reference Include="System.Core" />
+ <Reference Include="System.Xml.Linq" />
+ <Reference Include="System.Data.DataSetExtensions" />
+ <Reference Include="Microsoft.CSharp" />
+ <Reference Include="System.Data" />
+ <Reference Include="System.Xml" />
+ </ItemGroup>
+ <ItemGroup>
+ <Compile Include="AvroUtils.cs" />
+ <Compile Include="ByteUtilities.cs" />
+ <Compile Include="Diagnostics\DiagnosticsMessages.cs" />
+ <Compile Include="Diagnostics\Exceptions.cs" />
+ <Compile Include="IIdentifiable.cs" />
+ <Compile Include="IMessage.cs" />
+ <Compile Include="Logging\JavaLoggingSetting.cs" />
+ <Compile Include="Logging\Level.cs" />
+ <Compile Include="Logging\Logger.cs" />
+ <Compile Include="NetUtilities.cs" />
+ <Compile Include="Optional.cs" />
+ <Compile Include="Properties\AssemblyInfo.cs" />
+ <Compile Include="ValidationUtilities.cs" />
+ </ItemGroup>
+ <ItemGroup>
+ <None Include="packages.config" />
+ </ItemGroup>
+ <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
+ <Import Project="$(SolutionDir)\.nuget\NuGet.targets" Condition="Exists('$(SolutionDir)\.nuget\NuGet.targets')" />
+ <!-- To modify your build process, add your task inside one of the targets below and uncomment it.
+ Other similar extension points exist, see Microsoft.Common.targets.
+ <Target Name="BeforeBuild">
+ </Target>
+ <Target Name="AfterBuild">
+ </Target>
+ -->
+</Project>
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Utilities/Properties/AssemblyInfo.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Utilities/Properties/AssemblyInfo.cs b/lang/cs/Org.Apache.REEF.Utilities/Properties/AssemblyInfo.cs
new file mode 100644
index 0000000..5401a32
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Utilities/Properties/AssemblyInfo.cs
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Reflection;
+using System.Runtime.CompilerServices;
+using System.Runtime.InteropServices;
+
+// General Information about an assembly is controlled through the following
+// set of attributes. Change these attribute values to modify the information
+// associated with an assembly.
+[assembly: AssemblyTitle("Org.Apache.REEF.Utilities")]
+[assembly: AssemblyDescription("")]
+[assembly: AssemblyConfiguration("")]
+[assembly: AssemblyCompany("")]
+[assembly: AssemblyProduct("Org.Apache.REEF.Utilities")]
+[assembly: AssemblyCopyright("Copyright © 2015")]
+[assembly: AssemblyTrademark("")]
+[assembly: AssemblyCulture("")]
+
+// Setting ComVisible to false makes the types in this assembly not visible
+// to COM components. If you need to access a type in this assembly from
+// COM, set the ComVisible attribute to true on that type.
+[assembly: ComVisible(false)]
+
+// The following GUID is for the ID of the typelib if this project is exposed to COM
+[assembly: Guid("a7bda51a-552a-4fba-a834-f715c19454ab")]
+
+// Version information for an assembly consists of the following four values:
+//
+// Major Version
+// Minor Version
+// Build Number
+// Revision
+//
+// You can specify all the values or you can default the Build and Revision Numbers
+// by using the '*' as shown below:
+// [assembly: AssemblyVersion("1.0.*")]
+[assembly: AssemblyVersion("1.0.0.0")]
+[assembly: AssemblyFileVersion("1.0.0.0")]
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Utilities/ValidationUtilities.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Utilities/ValidationUtilities.cs b/lang/cs/Org.Apache.REEF.Utilities/ValidationUtilities.cs
new file mode 100644
index 0000000..80507fc
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Utilities/ValidationUtilities.cs
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Globalization;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Utilities
+{
+ public class ValidationUtilities
+ {
+ private static readonly Logger LOGGER = Logger.GetLogger(typeof(ValidationUtilities));
+
+ public static string ValidateEnvVariable(string env)
+ {
+ string envVariable = Environment.GetEnvironmentVariable(env);
+ if (string.IsNullOrWhiteSpace(envVariable))
+ {
+ Console.WriteLine(string.Format(CultureInfo.InvariantCulture, "{0} not set. Please set the environment variable first. Exiting...", env));
+ string msg = string.Format(CultureInfo.InvariantCulture, "No {0} found.", env);
+ Diagnostics.Exceptions.Throw(new InvalidOperationException(msg), msg, LOGGER);
+ }
+ return envVariable;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Utilities/packages.config
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Utilities/packages.config b/lang/cs/Org.Apache.REEF.Utilities/packages.config
new file mode 100644
index 0000000..c60eef8
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Utilities/packages.config
@@ -0,0 +1,23 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<packages>
+ <package id="Microsoft.Hadoop.Avro" version="1.4.0.0" targetFramework="net45" />
+ <package id="Newtonsoft.Json" version="6.0.8" targetFramework="net45" />
+</packages>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake.Tests/ClockTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/ClockTest.cs b/lang/cs/Org.Apache.REEF.Wake.Tests/ClockTest.cs
new file mode 100644
index 0000000..3af063f
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/ClockTest.cs
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Reactive;
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Wake.Time;
+using Org.Apache.REEF.Wake.Time.Runtime;
+using Org.Apache.REEF.Wake.Time.Runtime.Event;
+
+namespace Org.Apache.REEF.Wake.Tests
+{
+ [TestClass]
+ public class ClockTest
+ {
+ [TestMethod]
+ public void TestClock()
+ {
+ using (RuntimeClock clock = BuildClock())
+ {
+ Task.Run(new Action(clock.Run));
+
+ var heartBeat = new HeartbeatObserver(clock);
+ heartBeat.OnNext(null);
+ Thread.Sleep(5000);
+
+ Assert.AreEqual(100, heartBeat.EventCount);
+ }
+ }
+
+ [TestMethod]
+ public void TestAlarmRegistrationRaceConditions()
+ {
+ using (RuntimeClock clock = BuildClock())
+ {
+ Task.Run(new Action(clock.Run));
+
+ List<Alarm> events1 = new List<Alarm>();
+ List<Alarm> events2 = new List<Alarm>();
+
+ // Observers to record events that they have processed
+ IObserver<Alarm> earlierRecorder = Observer.Create<Alarm>(events1.Add);
+ IObserver<Alarm> laterRecorder = Observer.Create<Alarm>(events2.Add);
+
+ // Schedule a later alarm in the future
+ clock.ScheduleAlarm(5000, laterRecorder);
+
+ // After 1 second, schedule an earlier alarm that will fire before the later alarm
+ Thread.Sleep(1000);
+ clock.ScheduleAlarm(2000, earlierRecorder);
+
+ // The earlier alarm should not have fired after 1 second
+ Thread.Sleep(1000);
+ Assert.AreEqual(0, events1.Count);
+
+ // The earlier alarm will have fired after another 1.5 seconds, but the later will have not
+ Thread.Sleep(1500);
+ Assert.AreEqual(1, events1.Count);
+ Assert.AreEqual(0, events2.Count);
+
+ // The later alarm will have fired after 2 seconds
+ Thread.Sleep(2000);
+ Assert.AreEqual(1, events1.Count);
+ }
+ }
+
+ [TestMethod]
+ public void TestSimulatenousAlarms()
+ {
+ using (RuntimeClock clock = BuildClock())
+ {
+ Task.Run(new Action(clock.Run));
+
+ List<Alarm> events = new List<Alarm>();
+ IObserver<Alarm> eventRecorder = Observer.Create<Alarm>(events.Add);
+
+ clock.ScheduleAlarm(1000, eventRecorder);
+ clock.ScheduleAlarm(1000, eventRecorder);
+ clock.ScheduleAlarm(1000, eventRecorder);
+
+ Thread.Sleep(1500);
+ Assert.AreEqual(3, events.Count);
+ }
+ }
+
+ [TestMethod]
+ public void TestAlarmOrder()
+ {
+ using (RuntimeClock clock = BuildLogicalClock())
+ {
+ Task.Run(new Action(clock.Run));
+
+ // Event handler to record event time stamps
+ List<long> recordedTimestamps = new List<long>();
+ IObserver<Alarm> eventRecorder = Observer.Create<Alarm>(alarm => recordedTimestamps.Add(alarm.TimeStamp));
+
+ // Schedule 10 alarms every 100 ms
+ List<long> expectedTimestamps = Enumerable.Range(0, 10).Select(offset => (long)offset * 100).ToList();
+ expectedTimestamps.ForEach(offset => clock.ScheduleAlarm(offset, eventRecorder));
+
+ // Check that the recorded timestamps are in the same order that they were scheduled
+ Thread.Sleep(1500);
+ Assert.IsTrue(expectedTimestamps.SequenceEqual(recordedTimestamps));
+ }
+ }
+
+ private RuntimeClock BuildClock()
+ {
+ var builder = TangFactory.GetTang().NewConfigurationBuilder();
+
+ return TangFactory.GetTang()
+ .NewInjector(builder.Build())
+ .GetInstance<RuntimeClock>();
+ }
+
+ private RuntimeClock BuildLogicalClock()
+ {
+ var builder = TangFactory.GetTang().NewConfigurationBuilder();
+ builder.BindImplementation(GenericType<ITimer>.Class, GenericType<LogicalTimer>.Class);
+
+ return TangFactory.GetTang()
+ .NewInjector(builder.Build())
+ .GetInstance<RuntimeClock>();
+ }
+
+ private class HeartbeatObserver : IObserver<Alarm>
+ {
+ private RuntimeClock _clock;
+
+ public HeartbeatObserver(RuntimeClock clock)
+ {
+ _clock = clock;
+ EventCount = 0;
+ }
+
+ public int EventCount { get; set; }
+
+ public void OnNext(Alarm value)
+ {
+ EventCount++;
+ if (EventCount < 100)
+ {
+ _clock.ScheduleAlarm(10, this);
+ }
+ }
+
+ public void OnError(Exception error)
+ {
+ throw new NotImplementedException();
+ }
+
+ public void OnCompleted()
+ {
+ throw new NotImplementedException();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake.Tests/MultiCodecTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/MultiCodecTest.cs b/lang/cs/Org.Apache.REEF.Wake.Tests/MultiCodecTest.cs
new file mode 100644
index 0000000..61e8ebd
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/MultiCodecTest.cs
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Text;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Wake.Remote.Impl;
+
+namespace Org.Apache.REEF.Wake.Tests
+{
+ [TestClass]
+ public class MultiCodecTest
+ {
+ [TestMethod]
+ public void TestMultiCodec()
+ {
+ MultiCodec<BaseEvent> codec = new MultiCodec<BaseEvent>();
+ codec.Register(new Event1Codec());
+ codec.Register(new Event2Codec());
+
+ byte[] d1Data = codec.Encode(new Event1(42));
+ byte[] d2Data = codec.Encode(new Event2("Tony"));
+
+ Event1 e1 = (Event1)codec.Decode(d1Data);
+ Event2 e2 = (Event2)codec.Decode(d2Data);
+
+ Assert.AreEqual(42, e1.Number);
+ Assert.AreEqual("Tony", e2.Name);
+ }
+
+ private class BaseEvent
+ {
+ }
+
+ private class Event1 : BaseEvent
+ {
+ public Event1(int number)
+ {
+ Number = number;
+ }
+
+ public int Number { get; set; }
+ }
+
+ private class Event2 : BaseEvent
+ {
+ public Event2(string name)
+ {
+ Name = name;
+ }
+
+ public string Name { get; set; }
+ }
+
+ private class Event1Codec : ICodec<Event1>
+ {
+ public byte[] Encode(Event1 obj)
+ {
+ return BitConverter.GetBytes(obj.Number);
+ }
+
+ public Event1 Decode(byte[] data)
+ {
+ return new Event1(BitConverter.ToInt32(data, 0));
+ }
+ }
+
+ private class Event2Codec : ICodec<Event2>
+ {
+ public byte[] Encode(Event2 obj)
+ {
+ return Encoding.ASCII.GetBytes(obj.Name);
+ }
+
+ public Event2 Decode(byte[] data)
+ {
+ return new Event2(Encoding.ASCII.GetString(data));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj b/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj
new file mode 100644
index 0000000..3ae017d
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj
@@ -0,0 +1,124 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<Project ToolsVersion="12.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <Import Project="$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props" Condition="Exists('$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props')" />
+ <PropertyGroup>
+ <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
+ <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
+ <ProjectGuid>{214C64C6-04E5-4867-B69A-E3502EA50871}</ProjectGuid>
+ <OutputType>Library</OutputType>
+ <AppDesignerFolder>Properties</AppDesignerFolder>
+ <RootNamespace>Org.Apache.REEF.Wake.Tests</RootNamespace>
+ <AssemblyName>Org.Apache.REEF.Wake.Tests</AssemblyName>
+ <TargetFrameworkVersion>v4.5</TargetFrameworkVersion>
+ <FileAlignment>512</FileAlignment>
+ <SolutionDir Condition="$(SolutionDir) == '' Or $(SolutionDir) == '*Undefined*'">..\</SolutionDir>
+ <RestorePackages>true</RestorePackages>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
+ <PlatformTarget>AnyCPU</PlatformTarget>
+ <DebugSymbols>true</DebugSymbols>
+ <DebugType>full</DebugType>
+ <Optimize>false</Optimize>
+ <OutputPath>..\bin\$(Platform)\$(Configuration)\$(RootNamespace)</OutputPath>
+ <DefineConstants>DEBUG;TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|x64' ">
+ <PlatformTarget>AnyCPU</PlatformTarget>
+ <DebugSymbols>true</DebugSymbols>
+ <DebugType>full</DebugType>
+ <Optimize>false</Optimize>
+ <OutputPath>..\bin\$(Platform)\$(Configuration)\$(RootNamespace)</OutputPath>
+ <DefineConstants>DEBUG;TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
+ <PlatformTarget>AnyCPU</PlatformTarget>
+ <DebugType>pdbonly</DebugType>
+ <Optimize>true</Optimize>
+ <OutputPath>..\bin\$(Platform)\$(Configuration)\$(RootNamespace)</OutputPath>
+ <DefineConstants>TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|x64' ">
+ <PlatformTarget>AnyCPU</PlatformTarget>
+ <DebugType>pdbonly</DebugType>
+ <Optimize>true</Optimize>
+ <OutputPath>..\bin\$(Platform)\$(Configuration)\$(RootNamespace)</OutputPath>
+ <DefineConstants>TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <ItemGroup>
+ <Reference Include="Microsoft.VisualStudio.QualityTools.UnitTestFramework, Version=10.1.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a, processorArchitecture=MSIL" />
+ <Reference Include="System" />
+ <Reference Include="System.Core" />
+ <Reference Include="System.Reactive.Core">
+ <HintPath>..\packages\Rx-Core.2.2.5\lib\net45\System.Reactive.Core.dll</HintPath>
+ </Reference>
+ <Reference Include="System.Reactive.Interfaces">
+ <HintPath>..\packages\Rx-Interfaces.2.2.5\lib\net45\System.Reactive.Interfaces.dll</HintPath>
+ </Reference>
+ <Reference Include="System.Xml.Linq" />
+ <Reference Include="System.Data.DataSetExtensions" />
+ <Reference Include="Microsoft.CSharp" />
+ <Reference Include="System.Data" />
+ <Reference Include="System.Xml" />
+ </ItemGroup>
+ <ItemGroup>
+ <Compile Include="ClockTest.cs" />
+ <Compile Include="MultiCodecTest.cs" />
+ <Compile Include="Properties\AssemblyInfo.cs" />
+ <Compile Include="PubSubSubjectTest.cs" />
+ <Compile Include="RemoteManagerTest.cs" />
+ <Compile Include="TransportTest.cs" />
+ </ItemGroup>
+ <ItemGroup>
+ <ProjectReference Include="..\Org.Apache.REEF.Tang\Org.Apache.REEF.Tang.csproj">
+ <Project>{97dbb573-3994-417a-9f69-ffa25f00d2a6}</Project>
+ <Name>Org.Apache.REEF.Tang</Name>
+ </ProjectReference>
+ <ProjectReference Include="..\Org.Apache.REEF.Utilities\Org.Apache.REEF.Utilities.csproj">
+ <Project>{79e7f89a-1dfb-45e1-8d43-d71a954aeb98}</Project>
+ <Name>Org.Apache.REEF.Utilities</Name>
+ </ProjectReference>
+ <ProjectReference Include="..\Org.Apache.REEF.Wake\Org.Apache.REEF.Wake.csproj">
+ <Project>{cdfb3464-4041-42b1-9271-83af24cd5008}</Project>
+ <Name>Org.Apache.REEF.Wake</Name>
+ </ProjectReference>
+ </ItemGroup>
+ <ItemGroup>
+ <None Include="packages.config" />
+ </ItemGroup>
+ <ItemGroup>
+ <Service Include="{82A7F48D-3B50-4B1E-B82E-3ADA8210C358}" />
+ </ItemGroup>
+ <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
+ <Import Project="$(SolutionDir)\.nuget\NuGet.targets" Condition="Exists('$(SolutionDir)\.nuget\NuGet.targets')" />
+ <!-- To modify your build process, add your task inside one of the targets below and uncomment it.
+ Other similar extension points exist, see Microsoft.Common.targets.
+ <Target Name="BeforeBuild">
+ </Target>
+ <Target Name="AfterBuild">
+ </Target>
+ -->
+</Project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake.Tests/Properties/AssemblyInfo.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/Properties/AssemblyInfo.cs b/lang/cs/Org.Apache.REEF.Wake.Tests/Properties/AssemblyInfo.cs
new file mode 100644
index 0000000..47e58c2
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/Properties/AssemblyInfo.cs
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Reflection;
+using System.Runtime.CompilerServices;
+using System.Runtime.InteropServices;
+
+// General Information about an assembly is controlled through the following
+// set of attributes. Change these attribute values to modify the information
+// associated with an assembly.
+[assembly: AssemblyTitle("Org.Apache.REEF.Wake.Tests")]
+[assembly: AssemblyDescription("")]
+[assembly: AssemblyConfiguration("")]
+[assembly: AssemblyCompany("")]
+[assembly: AssemblyProduct("Org.Apache.REEF.Wake.Tests")]
+[assembly: AssemblyCopyright("Copyright © 2015")]
+[assembly: AssemblyTrademark("")]
+[assembly: AssemblyCulture("")]
+
+// Setting ComVisible to false makes the types in this assembly not visible
+// to COM components. If you need to access a type in this assembly from
+// COM, set the ComVisible attribute to true on that type.
+[assembly: ComVisible(false)]
+
+// The following GUID is for the ID of the typelib if this project is exposed to COM
+[assembly: Guid("68a2ef80-e51b-4abb-9ccc-81354e152758")]
+
+// Version information for an assembly consists of the following four values:
+//
+// Major Version
+// Minor Version
+// Build Number
+// Revision
+//
+// You can specify all the values or you can default the Build and Revision Numbers
+// by using the '*' as shown below:
+// [assembly: AssemblyVersion("1.0.*")]
+[assembly: AssemblyVersion("1.0.0.0")]
+[assembly: AssemblyFileVersion("1.0.0.0")]
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake.Tests/PubSubSubjectTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/PubSubSubjectTest.cs b/lang/cs/Org.Apache.REEF.Wake.Tests/PubSubSubjectTest.cs
new file mode 100644
index 0000000..7ce56d3
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/PubSubSubjectTest.cs
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Reactive;
+using System.Threading;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.REEF.Wake.RX.Impl;
+
+namespace Org.Apache.REEF.Wake.Tests
+{
+ [TestClass]
+ public class PubSubSubjectTest
+ {
+ [TestMethod]
+ public void TestPubSubSubjectSingleThread()
+ {
+ int sum = 0;
+
+ // Observer that adds sum of numbers up to and including x
+ PubSubSubject<int> subject = new PubSubSubject<int>();
+ subject.Subscribe(Observer.Create<int>(
+ x =>
+ {
+ for (int i = 0; i <= x; i++)
+ {
+ sum += i;
+ }
+ }));
+
+ subject.OnNext(10);
+ subject.OnCompleted();
+ Assert.AreEqual(sum, 55);
+ }
+
+ [TestMethod]
+ public void TestPubSubSubjectMultipleThreads()
+ {
+ int sum = 0;
+
+ PubSubSubject<int> subject = new PubSubSubject<int>();
+ subject.Subscribe(Observer.Create<int>(x => sum += x));
+
+ Thread[] threads = new Thread[10];
+ for (int i = 0; i < threads.Length; i++)
+ {
+ threads[i] = new Thread(() =>
+ {
+ for (int j = 0; j < 10000; j++)
+ {
+ subject.OnNext(1);
+ }
+ });
+
+ threads[i].Start();
+ }
+
+ foreach (Thread thread in threads)
+ {
+ thread.Join();
+ }
+
+ Assert.AreEqual(sum, 100000);
+ }
+
+ [TestMethod]
+ public void TestMultipleTypes()
+ {
+ int sum1 = 0;
+ int sum2 = 0;
+
+ PubSubSubject<SuperEvent> subject = new PubSubSubject<SuperEvent>();
+ subject.Subscribe(Observer.Create<SubEvent1>(x => sum1 += 100));
+ subject.Subscribe(Observer.Create<SubEvent2>(x => sum2 += 500));
+
+ subject.OnNext(new SubEvent1());
+ subject.OnNext(new SubEvent2());
+ subject.OnNext(new SubEvent2());
+
+ Assert.AreEqual(sum1, 100);
+ Assert.AreEqual(sum2, 1000);
+ }
+
+ [TestMethod]
+ public void TestOnCompleted()
+ {
+ int sum = 0;
+
+ PubSubSubject<int> subject = new PubSubSubject<int>();
+ subject.Subscribe(Observer.Create<int>(x => sum += x));
+
+ subject.OnNext(10);
+ Assert.AreEqual(10, sum);
+
+ subject.OnNext(10);
+ Assert.AreEqual(20, sum);
+
+ // Check that after calling OnCompleted, OnNext will do nothing
+ subject.OnCompleted();
+ subject.OnNext(10);
+ Assert.AreEqual(20, sum);
+ }
+
+ [TestMethod]
+ public void TestOnError()
+ {
+ int sum = 0;
+
+ PubSubSubject<int> subject = new PubSubSubject<int>();
+ subject.Subscribe(Observer.Create<int>(x => sum += x));
+
+ subject.OnNext(10);
+ Assert.AreEqual(10, sum);
+
+ subject.OnNext(10);
+ Assert.AreEqual(20, sum);
+
+ // Check that after calling OnError, OnNext will do nothing
+ subject.OnError(new Exception("error"));
+ subject.OnNext(10);
+ Assert.AreEqual(20, sum);
+ }
+
+ [TestMethod]
+ public void TestDisposeSingleSubject()
+ {
+ int sum = 0;
+
+ PubSubSubject<int> subject = new PubSubSubject<int>();
+ var disposable = subject.Subscribe(Observer.Create<int>(x => sum += x));
+
+ subject.OnNext(10);
+ subject.OnNext(10);
+ subject.OnNext(10);
+ Assert.AreEqual(30, sum);
+
+ // Unregister the subject and check that calling OnNext does nothing
+ disposable.Dispose();
+ subject.OnNext(10);
+ Assert.AreEqual(30, sum);
+ }
+
+ [TestMethod]
+ public void TestDisposeMultipleSubjects()
+ {
+ int sum1 = 0;
+ int sum2 = 0;
+
+ SubEvent1 event1 = new SubEvent1();
+ SubEvent2 event2 = new SubEvent2();
+
+ PubSubSubject<SuperEvent> subject = new PubSubSubject<SuperEvent>();
+ var disposable1 = subject.Subscribe(Observer.Create<SubEvent1>(x => sum1 += 100));
+ var disposable2 = subject.Subscribe(Observer.Create<SubEvent2>(x => sum2 += 500));
+
+ subject.OnNext(event1);
+ subject.OnNext(event2);
+ subject.OnNext(event2);
+ Assert.AreEqual(sum1, 100);
+ Assert.AreEqual(sum2, 1000);
+
+ // Check that unsubscribing from SubEvent1 does not affect other subscriptions
+ disposable1.Dispose();
+ subject.OnNext(event1);
+ subject.OnNext(event2);
+ Assert.AreEqual(sum1, 100);
+ Assert.AreEqual(sum2, 1500);
+
+ // Unsubscribe from the remaining event types
+ disposable2.Dispose();
+ subject.OnNext(event1);
+ subject.OnNext(event2);
+ Assert.AreEqual(sum1, 100);
+ Assert.AreEqual(sum2, 1500);
+ }
+
+ class SuperEvent
+ {
+ }
+
+ class SubEvent1 : SuperEvent
+ {
+ }
+
+ class SubEvent2 : SuperEvent
+ {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake.Tests/RemoteManagerTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/RemoteManagerTest.cs b/lang/cs/Org.Apache.REEF.Wake.Tests/RemoteManagerTest.cs
new file mode 100644
index 0000000..3b3ac6d
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/RemoteManagerTest.cs
@@ -0,0 +1,338 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Net;
+using System.Reactive;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Wake.Remote.Impl;
+using Org.Apache.REEF.Wake.Util;
+
+namespace Org.Apache.REEF.Wake.Tests
+{
+ [TestClass]
+ public class RemoteManagerTest
+ {
+ [TestMethod]
+ public void TestOneWayCommunication()
+ {
+ IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
+
+ BlockingCollection<string> queue = new BlockingCollection<string>();
+ List<string> events = new List<string>();
+
+ using (var remoteManager1 = new DefaultRemoteManager<string>(listeningAddress, 0, new StringCodec()))
+ using (var remoteManager2 = new DefaultRemoteManager<string>(listeningAddress, 0, new StringCodec()))
+ {
+ var observer = Observer.Create<string>(queue.Add);
+ IPEndPoint endpoint1 = new IPEndPoint(listeningAddress, 0);
+ remoteManager2.RegisterObserver(endpoint1, observer);
+
+ var remoteObserver = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
+ remoteObserver.OnNext("abc");
+ remoteObserver.OnNext("def");
+ remoteObserver.OnNext("ghi");
+
+ events.Add(queue.Take());
+ events.Add(queue.Take());
+ events.Add(queue.Take());
+ }
+
+ Assert.AreEqual(3, events.Count);
+ }
+
+ [TestMethod]
+ public void TestOneWayCommunicationClientOnly()
+ {
+ int listeningPort = NetworkUtils.GenerateRandomPort(6000, 7000);
+ IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
+
+ BlockingCollection<string> queue = new BlockingCollection<string>();
+ List<string> events = new List<string>();
+
+ using (var remoteManager1 = new DefaultRemoteManager<string>(new StringCodec()))
+ using (var remoteManager2 = new DefaultRemoteManager<string>(listeningAddress, listeningPort, new StringCodec()))
+ {
+ IPEndPoint remoteEndpoint = new IPEndPoint(listeningAddress, 0);
+ var observer = Observer.Create<string>(queue.Add);
+ remoteManager2.RegisterObserver(remoteEndpoint, observer);
+
+ var remoteObserver = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
+ remoteObserver.OnNext("abc");
+ remoteObserver.OnNext("def");
+ remoteObserver.OnNext("ghi");
+
+ events.Add(queue.Take());
+ events.Add(queue.Take());
+ events.Add(queue.Take());
+ }
+
+ Assert.AreEqual(3, events.Count);
+ }
+
+ [TestMethod]
+ public void TestTwoWayCommunication()
+ {
+ IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
+
+ BlockingCollection<string> queue1 = new BlockingCollection<string>();
+ BlockingCollection<string> queue2 = new BlockingCollection<string>();
+ List<string> events1 = new List<string>();
+ List<string> events2 = new List<string>();
+
+ using (var remoteManager1 = new DefaultRemoteManager<string>(listeningAddress, 0, new StringCodec()))
+ using (var remoteManager2 = new DefaultRemoteManager<string>(listeningAddress, 0, new StringCodec()))
+ {
+ // Register observers for remote manager 1 and remote manager 2
+ var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
+ var observer1 = Observer.Create<string>(queue1.Add);
+ var observer2 = Observer.Create<string>(queue2.Add);
+ remoteManager1.RegisterObserver(remoteEndpoint, observer1);
+ remoteManager2.RegisterObserver(remoteEndpoint, observer2);
+
+ // Remote manager 1 sends 3 events to remote manager 2
+ var remoteObserver1 = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
+ remoteObserver1.OnNext("abc");
+ remoteObserver1.OnNext("def");
+ remoteObserver1.OnNext("ghi");
+
+ // Remote manager 2 sends 4 events to remote manager 1
+ var remoteObserver2 = remoteManager2.GetRemoteObserver(remoteManager1.LocalEndpoint);
+ remoteObserver2.OnNext("jkl");
+ remoteObserver2.OnNext("mno");
+ remoteObserver2.OnNext("pqr");
+ remoteObserver2.OnNext("stu");
+
+ events1.Add(queue1.Take());
+ events1.Add(queue1.Take());
+ events1.Add(queue1.Take());
+ events1.Add(queue1.Take());
+
+ events2.Add(queue2.Take());
+ events2.Add(queue2.Take());
+ events2.Add(queue2.Take());
+ }
+
+ Assert.AreEqual(4, events1.Count);
+ Assert.AreEqual(3, events2.Count);
+ }
+
+ [TestMethod]
+ public void TestCommunicationThreeNodesOneWay()
+ {
+ IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
+
+ BlockingCollection<string> queue = new BlockingCollection<string>();
+ List<string> events = new List<string>();
+
+ using (var remoteManager1 = new DefaultRemoteManager<string>(listeningAddress, 0, new StringCodec()))
+ using (var remoteManager2 = new DefaultRemoteManager<string>(listeningAddress, 0, new StringCodec()))
+ using (var remoteManager3 = new DefaultRemoteManager<string>(listeningAddress, 0, new StringCodec()))
+ {
+ var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
+ var observer = Observer.Create<string>(queue.Add);
+ remoteManager3.RegisterObserver(remoteEndpoint, observer);
+
+ var remoteObserver1 = remoteManager1.GetRemoteObserver(remoteManager3.LocalEndpoint);
+ var remoteObserver2 = remoteManager2.GetRemoteObserver(remoteManager3.LocalEndpoint);
+
+ remoteObserver2.OnNext("abc");
+ remoteObserver1.OnNext("def");
+ remoteObserver2.OnNext("ghi");
+ remoteObserver1.OnNext("jkl");
+ remoteObserver2.OnNext("mno");
+
+ for (int i = 0; i < 5; i++)
+ {
+ events.Add(queue.Take());
+ }
+ }
+
+ Assert.AreEqual(5, events.Count);
+ }
+
+ [TestMethod]
+ public void TestCommunicationThreeNodesBothWays()
+ {
+ IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
+
+ BlockingCollection<string> queue1 = new BlockingCollection<string>();
+ BlockingCollection<string> queue2 = new BlockingCollection<string>();
+ BlockingCollection<string> queue3 = new BlockingCollection<string>();
+ List<string> events1 = new List<string>();
+ List<string> events2 = new List<string>();
+ List<string> events3 = new List<string>();
+
+ using (var remoteManager1 = new DefaultRemoteManager<string>(listeningAddress, 0, new StringCodec()))
+ using (var remoteManager2 = new DefaultRemoteManager<string>(listeningAddress, 0, new StringCodec()))
+ using (var remoteManager3 = new DefaultRemoteManager<string>(listeningAddress, 0, new StringCodec()))
+ {
+ var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
+
+ var observer = Observer.Create<string>(queue1.Add);
+ remoteManager1.RegisterObserver(remoteEndpoint, observer);
+ var observer2 = Observer.Create<string>(queue2.Add);
+ remoteManager2.RegisterObserver(remoteEndpoint, observer2);
+ var observer3 = Observer.Create<string>(queue3.Add);
+ remoteManager3.RegisterObserver(remoteEndpoint, observer3);
+
+ var remoteObserver1 = remoteManager1.GetRemoteObserver(remoteManager3.LocalEndpoint);
+ var remoteObserver2 = remoteManager2.GetRemoteObserver(remoteManager3.LocalEndpoint);
+
+ // Observer 1 and 2 send messages to observer 3
+ remoteObserver1.OnNext("abc");
+ remoteObserver1.OnNext("abc");
+ remoteObserver1.OnNext("abc");
+ remoteObserver2.OnNext("def");
+ remoteObserver2.OnNext("def");
+
+ // Observer 3 sends messages back to observers 1 and 2
+ var remoteObserver3a = remoteManager3.GetRemoteObserver(remoteManager1.LocalEndpoint);
+ var remoteObserver3b = remoteManager3.GetRemoteObserver(remoteManager2.LocalEndpoint);
+
+ remoteObserver3a.OnNext("ghi");
+ remoteObserver3a.OnNext("ghi");
+ remoteObserver3b.OnNext("jkl");
+ remoteObserver3b.OnNext("jkl");
+ remoteObserver3b.OnNext("jkl");
+
+ events1.Add(queue1.Take());
+ events1.Add(queue1.Take());
+
+ events2.Add(queue2.Take());
+ events2.Add(queue2.Take());
+ events2.Add(queue2.Take());
+
+ events3.Add(queue3.Take());
+ events3.Add(queue3.Take());
+ events3.Add(queue3.Take());
+ events3.Add(queue3.Take());
+ events3.Add(queue3.Take());
+ }
+
+ Assert.AreEqual(2, events1.Count);
+ Assert.AreEqual(3, events2.Count);
+ Assert.AreEqual(5, events3.Count);
+ }
+
+ [TestMethod]
+ public void TestRemoteSenderCallback()
+ {
+ IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
+
+ BlockingCollection<string> queue = new BlockingCollection<string>();
+ List<string> events = new List<string>();
+
+ using (var remoteManager1 = new DefaultRemoteManager<string>(listeningAddress, 0, new StringCodec()))
+ using (var remoteManager2 = new DefaultRemoteManager<string>(listeningAddress, 0, new StringCodec()))
+ {
+ // Register handler for when remote manager 2 receives events; respond
+ // with an ack
+ var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
+ var remoteObserver2 = remoteManager2.GetRemoteObserver(remoteManager1.LocalEndpoint);
+
+ var receiverObserver = Observer.Create<string>(
+ message => remoteObserver2.OnNext("received message: " + message));
+ remoteManager2.RegisterObserver(remoteEndpoint, receiverObserver);
+
+ // Register handler for remote manager 1 to record the ack
+ var senderObserver = Observer.Create<string>(queue.Add);
+ remoteManager1.RegisterObserver(remoteEndpoint, senderObserver);
+
+ // Begin to send messages
+ var remoteObserver1 = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
+ remoteObserver1.OnNext("hello");
+ remoteObserver1.OnNext("there");
+ remoteObserver1.OnNext("buddy");
+
+ events.Add(queue.Take());
+ events.Add(queue.Take());
+ events.Add(queue.Take());
+ }
+
+ Assert.AreEqual(3, events.Count);
+ Assert.AreEqual("received message: hello", events[0]);
+ Assert.AreEqual("received message: there", events[1]);
+ Assert.AreEqual("received message: buddy", events[2]);
+ }
+
+ [TestMethod]
+ public void TestRegisterObserverByType()
+ {
+ IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
+
+ BlockingCollection<string> queue = new BlockingCollection<string>();
+ List<string> events = new List<string>();
+
+ using (var remoteManager1 = new DefaultRemoteManager<string>(listeningAddress, 0, new StringCodec()))
+ using (var remoteManager2 = new DefaultRemoteManager<string>(listeningAddress, 0, new StringCodec()))
+ {
+ // RemoteManager2 listens and records events of type IRemoteEvent<string>
+ var observer = Observer.Create<IRemoteMessage<string>>(message => queue.Add(message.Message));
+ remoteManager2.RegisterObserver(observer);
+
+ // Remote manager 1 sends 3 events to remote manager 2
+ var remoteObserver = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
+ remoteObserver.OnNext("abc");
+ remoteObserver.OnNext("def");
+ remoteObserver.OnNext("ghi");
+
+ events.Add(queue.Take());
+ events.Add(queue.Take());
+ events.Add(queue.Take());
+ }
+
+ Assert.AreEqual(3, events.Count);
+ }
+
+ [TestMethod]
+ public void TestCachedConnection()
+ {
+ IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
+
+ BlockingCollection<string> queue = new BlockingCollection<string>();
+ List<string> events = new List<string>();
+
+ using (var remoteManager1 = new DefaultRemoteManager<string>(listeningAddress, 0, new StringCodec()))
+ using (var remoteManager2 = new DefaultRemoteManager<string>(listeningAddress, 0, new StringCodec()))
+ {
+ var observer = Observer.Create<string>(queue.Add);
+ IPEndPoint endpoint1 = new IPEndPoint(listeningAddress, 0);
+ remoteManager2.RegisterObserver(endpoint1, observer);
+
+ var remoteObserver = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
+ remoteObserver.OnNext("abc");
+ remoteObserver.OnNext("def");
+
+ var cachedObserver = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
+ cachedObserver.OnNext("ghi");
+ cachedObserver.OnNext("jkl");
+
+ events.Add(queue.Take());
+ events.Add(queue.Take());
+ events.Add(queue.Take());
+ events.Add(queue.Take());
+ }
+
+ Assert.AreEqual(4, events.Count);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake.Tests/TransportTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/TransportTest.cs b/lang/cs/Org.Apache.REEF.Wake.Tests/TransportTest.cs
new file mode 100644
index 0000000..3e67e0d
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/TransportTest.cs
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Net;
+using System.Reactive;
+using System.Threading.Tasks;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Wake.Remote.Impl;
+using Org.Apache.REEF.Wake.Util;
+
+namespace Org.Apache.REEF.Wake.Tests
+{
+ [TestClass]
+ public class TransportTest
+ {
+ [TestMethod]
+ public void TestTransportServer()
+ {
+ ICodec<string> codec = new StringCodec();
+ int port = NetworkUtils.GenerateRandomPort(6000, 7000);
+
+ BlockingCollection<string> queue = new BlockingCollection<string>();
+ List<string> events = new List<string>();
+
+ IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, port);
+ var remoteHandler = Observer.Create<TransportEvent<string>>(tEvent => queue.Add(tEvent.Data));
+
+ using (var server = new TransportServer<string>(endpoint, remoteHandler, codec))
+ {
+ server.Run();
+
+ IPEndPoint remoteEndpoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), port);
+ using (var client = new TransportClient<string>(remoteEndpoint, codec))
+ {
+ client.Send("Hello");
+ client.Send(", ");
+ client.Send("World!");
+
+ events.Add(queue.Take());
+ events.Add(queue.Take());
+ events.Add(queue.Take());
+ }
+ }
+
+ Assert.AreEqual(3, events.Count);
+ }
+
+ [TestMethod]
+ public void TestTransportServerEvent()
+ {
+ ICodec<TestEvent> codec = new TestEventCodec();
+ int port = NetworkUtils.GenerateRandomPort(6000, 7000);
+
+ BlockingCollection<TestEvent> queue = new BlockingCollection<TestEvent>();
+ List<TestEvent> events = new List<TestEvent>();
+
+ IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, port);
+ var remoteHandler = Observer.Create<TransportEvent<TestEvent>>(tEvent => queue.Add(tEvent.Data));
+
+ using (var server = new TransportServer<TestEvent>(endpoint, remoteHandler, codec))
+ {
+ server.Run();
+
+ IPEndPoint remoteEndpoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), port);
+ using (var client = new TransportClient<TestEvent>(remoteEndpoint, codec))
+ {
+ client.Send(new TestEvent("Hello"));
+ client.Send(new TestEvent(", "));
+ client.Send(new TestEvent("World!"));
+
+ events.Add(queue.Take());
+ events.Add(queue.Take());
+ events.Add(queue.Take());
+ }
+ }
+
+ Assert.AreEqual(3, events.Count);
+ }
+
+ [TestMethod]
+ public void TestTransportSenderStage()
+ {
+ ICodec<string> codec = new StringCodec();
+ int port = NetworkUtils.GenerateRandomPort(6000, 7000);
+ IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, port);
+
+ List<string> events = new List<string>();
+ BlockingCollection<string> queue = new BlockingCollection<string>();
+
+ // Server echoes the message back to the client
+ var remoteHandler = Observer.Create<TransportEvent<string>>(tEvent => tEvent.Link.Write(tEvent.Data));
+
+ using (TransportServer<string> server = new TransportServer<string>(endpoint, remoteHandler, codec))
+ {
+ server.Run();
+
+ var clientHandler = Observer.Create<TransportEvent<string>>(tEvent => queue.Add(tEvent.Data));
+ IPEndPoint remoteEndpoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), port);
+ using (var client = new TransportClient<string>(remoteEndpoint, codec, clientHandler))
+ {
+ client.Send("Hello");
+ client.Send(", ");
+ client.Send(" World");
+
+ events.Add(queue.Take());
+ events.Add(queue.Take());
+ events.Add(queue.Take());
+ }
+ }
+
+ Assert.AreEqual(3, events.Count);
+ }
+
+ [TestMethod]
+ public void TestRaceCondition()
+ {
+ ICodec<string> codec = new StringCodec();
+ int port = NetworkUtils.GenerateRandomPort(6000, 7000);
+
+ BlockingCollection<string> queue = new BlockingCollection<string>();
+ List<string> events = new List<string>();
+ int numEventsExpected = 150;
+
+ IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, port);
+ var remoteHandler = Observer.Create<TransportEvent<string>>(tEvent => queue.Add(tEvent.Data));
+
+ using (var server = new TransportServer<string>(endpoint, remoteHandler, codec))
+ {
+ server.Run();
+
+ for (int i = 0; i < numEventsExpected / 3; i++)
+ {
+ Task.Run(() =>
+ {
+ IPEndPoint remoteEndpoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), port);
+ using (var client = new TransportClient<string>(remoteEndpoint, codec))
+ {
+ client.Send("Hello");
+ client.Send(", ");
+ client.Send("World!");
+ }
+ });
+ }
+
+ for (int i = 0; i < numEventsExpected; i++)
+ {
+ events.Add(queue.Take());
+ }
+ }
+
+ Assert.AreEqual(numEventsExpected, events.Count);
+ }
+
+ private class TestEvent
+ {
+ public TestEvent(string message)
+ {
+ Message = message;
+ }
+
+ public string Message { get; set; }
+
+ public override string ToString()
+ {
+ return "TestEvent: " + Message;
+ }
+ }
+
+ private class TestEventCodec : ICodec<TestEvent>
+ {
+ public byte[] Encode(TestEvent obj)
+ {
+ return new StringCodec().Encode(obj.Message);
+ }
+
+ public TestEvent Decode(byte[] data)
+ {
+ return new TestEvent(new StringCodec().Decode(data));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake.Tests/packages.config
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/packages.config b/lang/cs/Org.Apache.REEF.Wake.Tests/packages.config
new file mode 100644
index 0000000..75e5b34
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/packages.config
@@ -0,0 +1,23 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<packages>
+ <package id="Rx-Core" version="2.2.5" targetFramework="net45" />
+ <package id="Rx-Interfaces" version="2.2.5" targetFramework="net45" />
+</packages>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/AbstractEStage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/AbstractEStage.cs b/lang/cs/Org.Apache.REEF.Wake/AbstractEStage.cs
new file mode 100644
index 0000000..ec78c82
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/AbstractEStage.cs
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.REEF.Wake
+{
+ /// <summary>
+ /// An estage that implements metering
+ /// </summary>
+ /// <typeparam name="T">The estage type</typeparam>
+ public abstract class AbstractEStage<T> : IEStage<T>
+ {
+ /// <summary>Constructs an abstract estage</summary>
+ /// <param name="meterName">the meter name</param>
+ protected AbstractEStage(string meterName)
+ {
+ }
+
+ /// <summary>Updates the meter</summary>
+ /// <param name="value">an event</param>
+ public virtual void OnNext(T value)
+ {
+ }
+
+ public abstract void Dispose();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/IEStage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/IEStage.cs b/lang/cs/Org.Apache.REEF.Wake/IEStage.cs
new file mode 100644
index 0000000..d9e91b6
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/IEStage.cs
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.REEF.Wake
+{
+ /// <summary>Stage that executes an event handler</summary>
+ public interface IEStage<T> : IEventHandler<T>, IStage
+ {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/IEventHandler.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/IEventHandler.cs b/lang/cs/Org.Apache.REEF.Wake/IEventHandler.cs
new file mode 100644
index 0000000..6ee267d
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/IEventHandler.cs
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.REEF.Wake
+{
+ /// <summary>
+ /// Handler to process an event
+ /// </summary>
+ /// <typeparam name="T">The type of event</typeparam>
+ public interface IEventHandler<T>
+ {
+ /// <summary>
+ /// Process an event
+ /// </summary>
+ /// <param name="value">The event to process</param>
+ void OnNext(T value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/IIdentifier.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/IIdentifier.cs b/lang/cs/Org.Apache.REEF.Wake/IIdentifier.cs
new file mode 100644
index 0000000..3ccaf1f
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/IIdentifier.cs
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Org.Apache.REEF.Wake
+{
+ /// <summary>
+ /// Identifier class for REEF.
+ ///
+ /// Identifiers are a generic naming primitive that carry some information about
+ /// the type of object that they point to.
+ ///
+ /// Examples include remote sockets or filenames.
+ /// </summary>
+ public abstract class IIdentifier
+ {
+ /// <summary>
+ /// Returns a hash code for the object
+ /// </summary>
+ /// <returns>The hash code value for this object</returns>
+ public abstract override int GetHashCode();
+
+ /// <summary>
+ /// Checks that another object is equal to this object
+ /// </summary>
+ /// <param name="o">The object to compare</param>
+ /// <returns>True if the object is the same as the object argument; false, otherwise</returns>
+ public abstract override bool Equals(object o);
+
+ /// <summary>
+ /// Returns a string representation of this object
+ /// </summary>
+ /// <returns>A string representation of this object</returns>
+ public abstract override string ToString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/IIdentifierFactory.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/IIdentifierFactory.cs b/lang/cs/Org.Apache.REEF.Wake/IIdentifierFactory.cs
new file mode 100644
index 0000000..9e781f3
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/IIdentifierFactory.cs
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Wake.Remote.Impl;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Org.Apache.REEF.Wake
+{
+ [DefaultImplementation(typeof(StringIdentifierFactory))]
+ public interface IIdentifierFactory
+ {
+ IIdentifier Create(string s);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/IObserverFactory.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/IObserverFactory.cs b/lang/cs/Org.Apache.REEF.Wake/IObserverFactory.cs
new file mode 100644
index 0000000..d43e4b3
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/IObserverFactory.cs
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Org.Apache.REEF.Wake
+{
+ public interface IObserverFactory
+ {
+ object Create();
+ }
+}