You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/02/05 22:05:50 UTC

[29/51] [partial] incubator-reef git commit: [REEF-131] Towards the new .Net project structure This is to change .Net project structure for Tang, Wake, REEF utilities, Common and Driver:

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Utilities/Logging/Logger.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Utilities/Logging/Logger.cs b/lang/cs/Org.Apache.REEF.Utilities/Logging/Logger.cs
new file mode 100644
index 0000000..3de6e22
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Utilities/Logging/Logger.cs
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Globalization;
+
+namespace Org.Apache.REEF.Utilities.Logging
+{
+    public class Logger
+    {
+        private static readonly string[] LogLevel = new string[]
+            {
+                "OFF",
+                "ERROR",
+                "WARNING",
+                "START",
+                "EXIT",
+                "INFO",
+                "VERBOSE"
+            };
+
+        private static readonly Dictionary<Level, TraceEventType> EventTypes
+            = new Dictionary<Level, TraceEventType>()
+                    {
+                        { Level.Off, TraceEventType.Stop },
+                        { Level.Error, TraceEventType.Error },
+                        { Level.Warning, TraceEventType.Warning },
+                        { Level.Start, TraceEventType.Start },
+                        { Level.Stop, TraceEventType.Stop },
+                        { Level.Info, TraceEventType.Information },
+                        { Level.Verbose, TraceEventType.Verbose },
+                    };
+
+        private static Level _customLevel = Level.Verbose;
+
+        private static List<TraceListener> _traceListeners;
+
+        private string _name;       
+
+        private TraceSource _traceSource;
+
+        private Logger(string name)
+        {
+            _name = name;
+            _traceSource = new TraceSource(_name, SourceLevels.All);
+            CustcomLevel = _customLevel;
+            if (TraceListeners.Count == 0)
+            {
+                // before customized listener is added, we would need to log to console
+                _traceSource.Listeners.Add(new ConsoleTraceListener());
+            }
+            else
+            {
+                _traceSource.Listeners.Clear();
+                foreach (TraceListener listener in TraceListeners)
+                {
+                    _traceSource.Listeners.Add(listener);
+                }  
+            }
+        }
+
+        public static Level CustcomLevel
+        {
+            get
+            {
+                return _customLevel;
+            }
+
+            set
+            {
+                _customLevel = value;
+            }
+        }
+
+        public static List<TraceListener> TraceListeners
+        {
+            get
+            {
+                if (_traceListeners == null)
+                {
+                    _traceListeners = new List<TraceListener>();
+                }
+                return _traceListeners;
+            }
+        }
+
+        public static void SetCustomLevel(Level customLevel)
+        {
+            _customLevel = customLevel;
+        }
+
+        public static void AddTraceListner(TraceListener listener)
+        {
+            TraceListeners.Add(listener);
+        }
+
+        public static Logger GetLogger(Type type)
+        {
+            return GetLogger(type.FullName);
+        }
+
+        public static Logger GetLogger(string name)
+        {
+            return new Logger(name);
+        }
+
+        /// <summary>
+        /// Log the message with the specified Log Level.
+        ///
+        /// If addtional arguments are passed, the message will be treated as
+        /// a format string.  The format string and the additional arguments 
+        /// will be formatted according to string.Format()
+        /// </summary>
+        /// <param name="level"></param>
+        /// <param name="formatStr"></param>
+        /// <param name="args"></param>
+        public void Log(Level level, string formatStr, params object[] args)
+        {
+            if (CustcomLevel >= level)
+            {
+                string msg = FormatMessage(formatStr, args);
+                string logMessage = 
+                    DateTime.Now.ToString("o", CultureInfo.InvariantCulture) 
+                    + " " 
+                    + System.Threading.Thread.CurrentThread.ManagedThreadId.ToString("D4", CultureInfo.InvariantCulture) 
+                    + Environment.NewLine + LogLevel[(int)level] + ": " 
+                    + msg;
+
+                _traceSource.TraceEvent(
+                    EventTypes[level],
+                    0, // we don't use event id for now, but this can be useful for e2e logging later  
+                    logMessage);
+            }
+        }
+
+        public void Log(Level level, string msg, Exception exception)
+        {
+            string exceptionLog = string.Empty;
+            if (exception != null)
+            {
+                exceptionLog = string.Format(
+                    CultureInfo.InvariantCulture,
+                    "encountered error [{0}] with mesage [{1}] and stack trace [{2}]",
+                    exception,
+                    exception.Message,
+                    exception.StackTrace);
+            }
+            Log(level, msg + exceptionLog);
+        }
+
+        public IDisposable LogFunction(string function, params object[] args)
+        {
+            return LogScope(function, args);
+        }
+
+        public IDisposable LogScope(string format, params object[] args)
+        {
+            return new LoggingScope(this, DateTime.Now + " " + format, args);
+        }
+
+        private string FormatMessage(string formatStr, params object[] args)
+        {
+            return args.Length > 0 ? string.Format(CultureInfo.CurrentCulture, formatStr, args) : formatStr;
+        }
+
+        /// <summary>
+        /// Represents a logging scope.
+        /// </summary>
+        /// <remarks>
+        /// A start log is written when an instance is created 
+        /// and a stop trace is written when the instance is disposed.
+        /// </remarks>
+        private sealed class LoggingScope : IDisposable
+        {
+            private readonly Stopwatch _stopWatch;
+
+            private readonly Logger _logger;
+
+            private readonly string _content;
+
+            /// <summary>
+            /// Initializes a new instance of the LoggingScope class. 
+            /// </summary>
+            /// <param name="logger"></param>
+            /// <param name="format"></param>
+            /// <param name="args"></param>
+            public LoggingScope(Logger logger, string format, params object[] args)
+            {
+                _logger = logger;
+
+                _stopWatch = Stopwatch.StartNew();
+
+                string content  = args.Length > 0 ? string.Format(CultureInfo.InvariantCulture, format, args) : format;
+                _content = content;
+
+                _logger.Log(Level.Start, content);
+            }
+
+            /// <summary>
+            /// Logs the end of a scope.
+            /// </summary>
+            public void Dispose()
+            {
+                _logger.Log(Level.Stop, string.Format(CultureInfo.InvariantCulture, "{0}. Duration: [{1}].", _content, _stopWatch.Elapsed));
+                _stopWatch.Stop();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Utilities/NetUtilities.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Utilities/NetUtilities.cs b/lang/cs/Org.Apache.REEF.Utilities/NetUtilities.cs
new file mode 100644
index 0000000..7c9a92c
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Utilities/NetUtilities.cs
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Utilities.Diagnostics;
+using Org.Apache.REEF.Utilities.Logging;
+using System;
+using System.Net;
+
+namespace Org.Apache.REEF.Utilities
+{
+    public class NetUtilities
+    {
+        private static readonly Logger LOGGER = Logger.GetLogger(typeof(NetUtilities));
+
+        public static IPEndPoint ParseIpEndpoint(string ipWithPort)
+        {
+            string ip = ipWithPort.TrimStart().TrimEnd();
+            if (char.IsDigit(ip[0]))
+            {
+                ip = @"socket://" + ip;
+            }
+            Uri uri = new Uri(ip);
+            string driverAddress = uri.Host;
+            int driverCommunicationPort = uri.Port;
+            IPAddress ipAddress;
+            IPAddress.TryParse(driverAddress, out ipAddress);
+            if (ipAddress == null)
+            {
+                Exceptions.Throw(new FormatException("invalid format for ip: " + ipWithPort), LOGGER);
+            }
+
+            return new IPEndPoint(ipAddress, driverCommunicationPort);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Utilities/Optional.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Utilities/Optional.cs b/lang/cs/Org.Apache.REEF.Utilities/Optional.cs
new file mode 100644
index 0000000..11d95fa
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Utilities/Optional.cs
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Utilities
+{
+    [Serializable]
+    public sealed class Optional<T>
+    {
+        private readonly T _value;
+
+        private Optional(T value)
+        {
+            _value = value;
+        }
+
+        private Optional()
+        {
+            _value = default(T);
+        }
+
+        public T Value 
+        {
+            get { return _value; }
+        }
+
+        public static Optional<T> Of(T value)
+        {
+            if (value == null)
+            {
+                Diagnostics.Exceptions.Throw(new ArgumentNullException("value", "Passed a null value. Use OfNullable() instead"), Logger.GetLogger(typeof(Optional<T>))); 
+            }
+            return new Optional<T>(value);
+        }
+
+        public static Optional<T> Empty()
+        {
+            return new Optional<T>();
+        }
+
+        public static Optional<T> OfNullable(T value)
+        {
+            if (value == null)
+            {
+                return Empty();
+            }
+            else
+            {
+                return Of(value);
+            }
+        }
+
+        public T OrElse(T other)
+        {
+            if (IsPresent())
+            {
+                return Value;
+            }
+            else
+            {
+                return other;
+            }
+        }
+
+        public bool IsPresent()
+        {
+            return (_value != null);
+        }
+
+        public override bool Equals(object obj)
+        {
+            if (this == obj)
+            {
+                return true;
+            }
+            if (obj == null || obj.GetType() != this.GetType())
+            {
+                return false;
+            }
+            Optional<T> optional = (Optional<T>)obj;
+            if (_value != null ? !_value.Equals(optional.Value) : optional.Value != null)
+            {
+                return false;
+            }
+            return true;
+        }
+
+        public override int GetHashCode()
+        {
+            return _value != null ? _value.GetHashCode() : 0;
+        }
+
+        public override string ToString()
+        {
+            return "Optional{value=" + _value + "}";
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Utilities/Org.Apache.Reef.Utilities.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Utilities/Org.Apache.Reef.Utilities.csproj b/lang/cs/Org.Apache.REEF.Utilities/Org.Apache.Reef.Utilities.csproj
new file mode 100644
index 0000000..27899de
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Utilities/Org.Apache.Reef.Utilities.csproj
@@ -0,0 +1,116 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<Project ToolsVersion="12.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+  <Import Project="$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props" Condition="Exists('$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props')" />
+  <PropertyGroup>
+    <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
+    <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
+    <ProjectGuid>{79E7F89A-1DFB-45E1-8D43-D71A954AEB98}</ProjectGuid>
+    <OutputType>Library</OutputType>
+    <AppDesignerFolder>Properties</AppDesignerFolder>
+    <RootNamespace>Org.Apache.REEF.Utilities</RootNamespace>
+    <AssemblyName>Org.Apache.REEF.Utilities</AssemblyName>
+    <TargetFrameworkVersion>v4.5</TargetFrameworkVersion>
+    <FileAlignment>512</FileAlignment>
+    <RestorePackages>true</RestorePackages>
+    <SolutionDir Condition="$(SolutionDir) == '' Or $(SolutionDir) == '*Undefined*'">..\</SolutionDir>
+  </PropertyGroup>
+  <Import Project="$(SolutionDir)\Source\build.props" />
+  <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
+    <PlatformTarget>AnyCPU</PlatformTarget>
+    <DebugSymbols>true</DebugSymbols>
+    <DebugType>full</DebugType>
+    <Optimize>false</Optimize>
+    <OutputPath>$(BinDir)\$(Platform)\$(Configuration)\$(RootNamespace)</OutputPath>
+    <DefineConstants>DEBUG;TRACE</DefineConstants>
+    <ErrorReport>prompt</ErrorReport>
+    <WarningLevel>4</WarningLevel>
+    <PlatformTarget>AnyCPU</PlatformTarget>
+  </PropertyGroup>
+  <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|x64' ">
+    <PlatformTarget>AnyCPU</PlatformTarget>
+    <DebugSymbols>true</DebugSymbols>
+    <DebugType>full</DebugType>
+    <Optimize>false</Optimize>
+    <OutputPath>$(BinDir)\$(Platform)\$(Configuration)\$(RootNamespace)</OutputPath>
+    <DefineConstants>DEBUG;TRACE</DefineConstants>
+    <ErrorReport>prompt</ErrorReport>
+    <WarningLevel>4</WarningLevel>
+    <PlatformTarget>AnyCPU</PlatformTarget>
+  </PropertyGroup>
+  <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
+    <PlatformTarget>AnyCPU</PlatformTarget>
+    <DebugType>pdbonly</DebugType>
+    <Optimize>true</Optimize>
+    <OutputPath>$(BinDir)\$(Platform)\$(Configuration)\$(RootNamespace)</OutputPath>
+    <DefineConstants>TRACE</DefineConstants>
+    <ErrorReport>prompt</ErrorReport>
+    <WarningLevel>4</WarningLevel>
+  </PropertyGroup>
+  <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|x64' ">
+    <PlatformTarget>AnyCPU</PlatformTarget>
+    <DebugType>pdbonly</DebugType>
+    <Optimize>true</Optimize>
+    <OutputPath>$(BinDir)\$(Platform)\$(Configuration)\$(RootNamespace)</OutputPath>
+    <DefineConstants>TRACE</DefineConstants>
+    <ErrorReport>prompt</ErrorReport>
+    <WarningLevel>4</WarningLevel>
+  </PropertyGroup>
+  <ItemGroup>
+    <Reference Include="Microsoft.Hadoop.Avro">
+      <HintPath>$(PackagesDir)\Microsoft.Hadoop.Avro.$(AvroVersion)\lib\net40\Microsoft.Hadoop.Avro.dll</HintPath>
+    </Reference>
+    <Reference Include="Newtonsoft.Json">
+      <HintPath>$(PackagesDir)\Newtonsoft.Json.$(NewtonsoftJsonVersion)\lib\net45\Newtonsoft.Json.dll</HintPath>
+    </Reference>
+    <Reference Include="System" />
+    <Reference Include="System.Core" />
+    <Reference Include="System.Xml.Linq" />
+    <Reference Include="System.Data.DataSetExtensions" />
+    <Reference Include="Microsoft.CSharp" />
+    <Reference Include="System.Data" />
+    <Reference Include="System.Xml" />
+  </ItemGroup>
+  <ItemGroup>
+    <Compile Include="AvroUtils.cs" />
+    <Compile Include="ByteUtilities.cs" />
+    <Compile Include="Diagnostics\DiagnosticsMessages.cs" />
+    <Compile Include="Diagnostics\Exceptions.cs" />
+    <Compile Include="IIdentifiable.cs" />
+    <Compile Include="IMessage.cs" />
+    <Compile Include="Logging\JavaLoggingSetting.cs" />
+    <Compile Include="Logging\Level.cs" />
+    <Compile Include="Logging\Logger.cs" />
+    <Compile Include="NetUtilities.cs" />
+    <Compile Include="Optional.cs" />
+    <Compile Include="Properties\AssemblyInfo.cs" />
+    <Compile Include="ValidationUtilities.cs" />
+  </ItemGroup>
+  <ItemGroup>
+    <None Include="packages.config" />
+  </ItemGroup>
+  <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
+  <Import Project="$(SolutionDir)\.nuget\NuGet.targets" Condition="Exists('$(SolutionDir)\.nuget\NuGet.targets')" />
+  <!-- To modify your build process, add your task inside one of the targets below and uncomment it. 
+       Other similar extension points exist, see Microsoft.Common.targets.
+  <Target Name="BeforeBuild">
+  </Target>
+  <Target Name="AfterBuild">
+  </Target>
+  -->
+</Project>

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Utilities/Properties/AssemblyInfo.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Utilities/Properties/AssemblyInfo.cs b/lang/cs/Org.Apache.REEF.Utilities/Properties/AssemblyInfo.cs
new file mode 100644
index 0000000..5401a32
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Utilities/Properties/AssemblyInfo.cs
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Reflection;
+using System.Runtime.CompilerServices;
+using System.Runtime.InteropServices;
+
+// General Information about an assembly is controlled through the following 
+// set of attributes. Change these attribute values to modify the information
+// associated with an assembly.
+[assembly: AssemblyTitle("Org.Apache.REEF.Utilities")]
+[assembly: AssemblyDescription("")]
+[assembly: AssemblyConfiguration("")]
+[assembly: AssemblyCompany("")]
+[assembly: AssemblyProduct("Org.Apache.REEF.Utilities")]
+[assembly: AssemblyCopyright("Copyright ©  2015")]
+[assembly: AssemblyTrademark("")]
+[assembly: AssemblyCulture("")]
+
+// Setting ComVisible to false makes the types in this assembly not visible 
+// to COM components.  If you need to access a type in this assembly from 
+// COM, set the ComVisible attribute to true on that type.
+[assembly: ComVisible(false)]
+
+// The following GUID is for the ID of the typelib if this project is exposed to COM
+[assembly: Guid("a7bda51a-552a-4fba-a834-f715c19454ab")]
+
+// Version information for an assembly consists of the following four values:
+//
+//      Major Version
+//      Minor Version 
+//      Build Number
+//      Revision
+//
+// You can specify all the values or you can default the Build and Revision Numbers 
+// by using the '*' as shown below:
+// [assembly: AssemblyVersion("1.0.*")]
+[assembly: AssemblyVersion("1.0.0.0")]
+[assembly: AssemblyFileVersion("1.0.0.0")]

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Utilities/ValidationUtilities.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Utilities/ValidationUtilities.cs b/lang/cs/Org.Apache.REEF.Utilities/ValidationUtilities.cs
new file mode 100644
index 0000000..80507fc
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Utilities/ValidationUtilities.cs
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Globalization;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Utilities
+{
+    public class ValidationUtilities
+    {
+        private static readonly Logger LOGGER = Logger.GetLogger(typeof(ValidationUtilities));
+
+        public static string ValidateEnvVariable(string env)
+        {
+            string envVariable = Environment.GetEnvironmentVariable(env);
+            if (string.IsNullOrWhiteSpace(envVariable))
+            {
+                Console.WriteLine(string.Format(CultureInfo.InvariantCulture, "{0} not set. Please set the environment variable first. Exiting...", env));
+                string msg = string.Format(CultureInfo.InvariantCulture, "No {0} found.", env);
+                Diagnostics.Exceptions.Throw(new InvalidOperationException(msg), msg, LOGGER);
+            }
+            return envVariable;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Utilities/packages.config
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Utilities/packages.config b/lang/cs/Org.Apache.REEF.Utilities/packages.config
new file mode 100644
index 0000000..c60eef8
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Utilities/packages.config
@@ -0,0 +1,23 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<packages>
+  <package id="Microsoft.Hadoop.Avro" version="1.4.0.0" targetFramework="net45" />
+  <package id="Newtonsoft.Json" version="6.0.8" targetFramework="net45" />
+</packages>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake.Tests/ClockTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/ClockTest.cs b/lang/cs/Org.Apache.REEF.Wake.Tests/ClockTest.cs
new file mode 100644
index 0000000..3af063f
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/ClockTest.cs
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Reactive;
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Wake.Time;
+using Org.Apache.REEF.Wake.Time.Runtime;
+using Org.Apache.REEF.Wake.Time.Runtime.Event;
+
+namespace Org.Apache.REEF.Wake.Tests
+{
+    [TestClass]
+    public class ClockTest
+    {
+        [TestMethod]
+        public void TestClock()
+        {
+            using (RuntimeClock clock = BuildClock())
+            {
+                Task.Run(new Action(clock.Run));
+
+                var heartBeat = new HeartbeatObserver(clock);
+                heartBeat.OnNext(null);
+                Thread.Sleep(5000);
+
+                Assert.AreEqual(100, heartBeat.EventCount);
+            }
+        }
+
+        [TestMethod]
+        public void TestAlarmRegistrationRaceConditions()
+        {
+            using (RuntimeClock clock = BuildClock())
+            {
+                Task.Run(new Action(clock.Run));
+
+                List<Alarm> events1 = new List<Alarm>();
+                List<Alarm> events2 = new List<Alarm>();
+
+                // Observers to record events that they have processed
+                IObserver<Alarm> earlierRecorder = Observer.Create<Alarm>(events1.Add);
+                IObserver<Alarm> laterRecorder = Observer.Create<Alarm>(events2.Add);
+
+                // Schedule a later alarm in the future
+                clock.ScheduleAlarm(5000, laterRecorder);
+
+                // After 1 second, schedule an earlier alarm that will fire before the later alarm
+                Thread.Sleep(1000);
+                clock.ScheduleAlarm(2000, earlierRecorder);
+
+                // The earlier alarm should not have fired after 1 second
+                Thread.Sleep(1000);
+                Assert.AreEqual(0, events1.Count);
+
+                // The earlier alarm will have fired after another 1.5 seconds, but the later will have not
+                Thread.Sleep(1500);
+                Assert.AreEqual(1, events1.Count);
+                Assert.AreEqual(0, events2.Count);
+
+                // The later alarm will have fired after 2 seconds
+                Thread.Sleep(2000);
+                Assert.AreEqual(1, events1.Count);
+            }
+        }
+
+        [TestMethod]
+        public void TestSimulatenousAlarms()
+        {
+            using (RuntimeClock clock = BuildClock())
+            {
+                Task.Run(new Action(clock.Run));
+
+                List<Alarm> events = new List<Alarm>();
+                IObserver<Alarm> eventRecorder = Observer.Create<Alarm>(events.Add);
+
+                clock.ScheduleAlarm(1000, eventRecorder);
+                clock.ScheduleAlarm(1000, eventRecorder);
+                clock.ScheduleAlarm(1000, eventRecorder);
+
+                Thread.Sleep(1500);
+                Assert.AreEqual(3, events.Count);
+            }
+        }
+
+        [TestMethod]
+        public void TestAlarmOrder()
+        {
+            using (RuntimeClock clock = BuildLogicalClock())
+            {
+                Task.Run(new Action(clock.Run));
+
+                // Event handler to record event time stamps
+                List<long> recordedTimestamps = new List<long>();
+                IObserver<Alarm> eventRecorder = Observer.Create<Alarm>(alarm => recordedTimestamps.Add(alarm.TimeStamp));
+
+                //  Schedule 10 alarms every 100 ms 
+                List<long> expectedTimestamps = Enumerable.Range(0, 10).Select(offset => (long)offset * 100).ToList();
+                expectedTimestamps.ForEach(offset => clock.ScheduleAlarm(offset, eventRecorder));
+    
+                // Check that the recorded timestamps are in the same order that they were scheduled
+                Thread.Sleep(1500);
+                Assert.IsTrue(expectedTimestamps.SequenceEqual(recordedTimestamps));
+            }
+        }
+
+        private RuntimeClock BuildClock()
+        {
+            var builder = TangFactory.GetTang().NewConfigurationBuilder();
+
+            return TangFactory.GetTang()
+                              .NewInjector(builder.Build())
+                              .GetInstance<RuntimeClock>();
+        }
+
+        private RuntimeClock BuildLogicalClock()
+        {
+            var builder = TangFactory.GetTang().NewConfigurationBuilder();
+            builder.BindImplementation(GenericType<ITimer>.Class, GenericType<LogicalTimer>.Class);
+
+            return TangFactory.GetTang()
+                              .NewInjector(builder.Build())
+                              .GetInstance<RuntimeClock>();
+        }
+
+        private class HeartbeatObserver : IObserver<Alarm>
+        {
+            private RuntimeClock _clock;
+
+            public HeartbeatObserver(RuntimeClock clock)
+            {
+                _clock = clock;
+                EventCount = 0;
+            }
+
+            public int EventCount { get; set; }
+
+            public void OnNext(Alarm value)
+            {
+                EventCount++;
+                if (EventCount < 100)
+                {
+                    _clock.ScheduleAlarm(10, this);
+                }
+            }
+
+            public void OnError(Exception error)
+            {
+                throw new NotImplementedException();
+            }
+
+            public void OnCompleted()
+            {
+                throw new NotImplementedException();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake.Tests/MultiCodecTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/MultiCodecTest.cs b/lang/cs/Org.Apache.REEF.Wake.Tests/MultiCodecTest.cs
new file mode 100644
index 0000000..61e8ebd
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/MultiCodecTest.cs
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Text;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Wake.Remote.Impl;
+
+namespace Org.Apache.REEF.Wake.Tests
+{
+    [TestClass]
+    public class MultiCodecTest
+    {
+        [TestMethod]
+        public void TestMultiCodec()
+        {
+            MultiCodec<BaseEvent> codec = new MultiCodec<BaseEvent>();
+            codec.Register(new Event1Codec());
+            codec.Register(new Event2Codec());
+
+            byte[] d1Data = codec.Encode(new Event1(42));
+            byte[] d2Data = codec.Encode(new Event2("Tony"));
+
+            Event1 e1 = (Event1)codec.Decode(d1Data);
+            Event2 e2 = (Event2)codec.Decode(d2Data);
+
+            Assert.AreEqual(42, e1.Number);
+            Assert.AreEqual("Tony", e2.Name);
+        }
+
+        private class BaseEvent
+        {
+        }
+
+        private class Event1 : BaseEvent
+        {
+            public Event1(int number)
+            {
+                Number = number;
+            }
+
+            public int Number { get; set; }
+        }
+
+        private class Event2 : BaseEvent
+        {
+            public Event2(string name)
+            {
+                Name = name;
+            }
+
+            public string Name { get; set; }
+        }
+
+        private class Event1Codec : ICodec<Event1>
+        {
+            public byte[] Encode(Event1 obj)
+            {
+                return BitConverter.GetBytes(obj.Number);
+            }
+
+            public Event1 Decode(byte[] data)
+            {
+                return new Event1(BitConverter.ToInt32(data, 0));
+            }
+        }
+
+        private class Event2Codec : ICodec<Event2>
+        {
+            public byte[] Encode(Event2 obj)
+            {
+                return Encoding.ASCII.GetBytes(obj.Name);
+            }
+
+            public Event2 Decode(byte[] data)
+            {
+                return new Event2(Encoding.ASCII.GetString(data));
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj b/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj
new file mode 100644
index 0000000..3ae017d
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj
@@ -0,0 +1,124 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<Project ToolsVersion="12.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+  <Import Project="$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props" Condition="Exists('$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props')" />
+  <PropertyGroup>
+    <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
+    <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
+    <ProjectGuid>{214C64C6-04E5-4867-B69A-E3502EA50871}</ProjectGuid>
+    <OutputType>Library</OutputType>
+    <AppDesignerFolder>Properties</AppDesignerFolder>
+    <RootNamespace>Org.Apache.REEF.Wake.Tests</RootNamespace>
+    <AssemblyName>Org.Apache.REEF.Wake.Tests</AssemblyName>
+    <TargetFrameworkVersion>v4.5</TargetFrameworkVersion>
+    <FileAlignment>512</FileAlignment>
+    <SolutionDir Condition="$(SolutionDir) == '' Or $(SolutionDir) == '*Undefined*'">..\</SolutionDir>
+    <RestorePackages>true</RestorePackages>
+  </PropertyGroup>
+  <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
+    <PlatformTarget>AnyCPU</PlatformTarget>
+    <DebugSymbols>true</DebugSymbols>
+    <DebugType>full</DebugType>
+    <Optimize>false</Optimize>
+    <OutputPath>..\bin\$(Platform)\$(Configuration)\$(RootNamespace)</OutputPath>
+    <DefineConstants>DEBUG;TRACE</DefineConstants>
+    <ErrorReport>prompt</ErrorReport>
+    <WarningLevel>4</WarningLevel>
+  </PropertyGroup>
+  <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|x64' ">
+    <PlatformTarget>AnyCPU</PlatformTarget>
+    <DebugSymbols>true</DebugSymbols>
+    <DebugType>full</DebugType>
+    <Optimize>false</Optimize>
+    <OutputPath>..\bin\$(Platform)\$(Configuration)\$(RootNamespace)</OutputPath>
+    <DefineConstants>DEBUG;TRACE</DefineConstants>
+    <ErrorReport>prompt</ErrorReport>
+    <WarningLevel>4</WarningLevel>
+  </PropertyGroup>
+  <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
+    <PlatformTarget>AnyCPU</PlatformTarget>
+    <DebugType>pdbonly</DebugType>
+    <Optimize>true</Optimize>
+    <OutputPath>..\bin\$(Platform)\$(Configuration)\$(RootNamespace)</OutputPath>
+    <DefineConstants>TRACE</DefineConstants>
+    <ErrorReport>prompt</ErrorReport>
+    <WarningLevel>4</WarningLevel>
+  </PropertyGroup>
+  <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|x64' ">
+    <PlatformTarget>AnyCPU</PlatformTarget>
+    <DebugType>pdbonly</DebugType>
+    <Optimize>true</Optimize>
+    <OutputPath>..\bin\$(Platform)\$(Configuration)\$(RootNamespace)</OutputPath>
+    <DefineConstants>TRACE</DefineConstants>
+    <ErrorReport>prompt</ErrorReport>
+    <WarningLevel>4</WarningLevel>
+  </PropertyGroup>
+  <ItemGroup>
+    <Reference Include="Microsoft.VisualStudio.QualityTools.UnitTestFramework, Version=10.1.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a, processorArchitecture=MSIL" />
+    <Reference Include="System" />
+    <Reference Include="System.Core" />
+    <Reference Include="System.Reactive.Core">
+      <HintPath>..\packages\Rx-Core.2.2.5\lib\net45\System.Reactive.Core.dll</HintPath>
+    </Reference>
+    <Reference Include="System.Reactive.Interfaces">
+      <HintPath>..\packages\Rx-Interfaces.2.2.5\lib\net45\System.Reactive.Interfaces.dll</HintPath>
+    </Reference>
+    <Reference Include="System.Xml.Linq" />
+    <Reference Include="System.Data.DataSetExtensions" />
+    <Reference Include="Microsoft.CSharp" />
+    <Reference Include="System.Data" />
+    <Reference Include="System.Xml" />
+  </ItemGroup>
+  <ItemGroup>
+    <Compile Include="ClockTest.cs" />
+    <Compile Include="MultiCodecTest.cs" />
+    <Compile Include="Properties\AssemblyInfo.cs" />
+    <Compile Include="PubSubSubjectTest.cs" />
+    <Compile Include="RemoteManagerTest.cs" />
+    <Compile Include="TransportTest.cs" />
+  </ItemGroup>
+  <ItemGroup>
+    <ProjectReference Include="..\Org.Apache.REEF.Tang\Org.Apache.REEF.Tang.csproj">
+      <Project>{97dbb573-3994-417a-9f69-ffa25f00d2a6}</Project>
+      <Name>Org.Apache.REEF.Tang</Name>
+    </ProjectReference>
+    <ProjectReference Include="..\Org.Apache.REEF.Utilities\Org.Apache.REEF.Utilities.csproj">
+      <Project>{79e7f89a-1dfb-45e1-8d43-d71a954aeb98}</Project>
+      <Name>Org.Apache.REEF.Utilities</Name>
+    </ProjectReference>
+    <ProjectReference Include="..\Org.Apache.REEF.Wake\Org.Apache.REEF.Wake.csproj">
+      <Project>{cdfb3464-4041-42b1-9271-83af24cd5008}</Project>
+      <Name>Org.Apache.REEF.Wake</Name>
+    </ProjectReference>
+  </ItemGroup>
+  <ItemGroup>
+    <None Include="packages.config" />
+  </ItemGroup>
+  <ItemGroup>
+    <Service Include="{82A7F48D-3B50-4B1E-B82E-3ADA8210C358}" />
+  </ItemGroup>
+  <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
+  <Import Project="$(SolutionDir)\.nuget\NuGet.targets" Condition="Exists('$(SolutionDir)\.nuget\NuGet.targets')" />
+  <!-- To modify your build process, add your task inside one of the targets below and uncomment it. 
+       Other similar extension points exist, see Microsoft.Common.targets.
+  <Target Name="BeforeBuild">
+  </Target>
+  <Target Name="AfterBuild">
+  </Target>
+  -->
+</Project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake.Tests/Properties/AssemblyInfo.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/Properties/AssemblyInfo.cs b/lang/cs/Org.Apache.REEF.Wake.Tests/Properties/AssemblyInfo.cs
new file mode 100644
index 0000000..47e58c2
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/Properties/AssemblyInfo.cs
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Reflection;
+using System.Runtime.CompilerServices;
+using System.Runtime.InteropServices;
+
+// General Information about an assembly is controlled through the following 
+// set of attributes. Change these attribute values to modify the information
+// associated with an assembly.
+[assembly: AssemblyTitle("Org.Apache.REEF.Wake.Tests")]
+[assembly: AssemblyDescription("")]
+[assembly: AssemblyConfiguration("")]
+[assembly: AssemblyCompany("")]
+[assembly: AssemblyProduct("Org.Apache.REEF.Wake.Tests")]
+[assembly: AssemblyCopyright("Copyright ©  2015")]
+[assembly: AssemblyTrademark("")]
+[assembly: AssemblyCulture("")]
+
+// Setting ComVisible to false makes the types in this assembly not visible 
+// to COM components.  If you need to access a type in this assembly from 
+// COM, set the ComVisible attribute to true on that type.
+[assembly: ComVisible(false)]
+
+// The following GUID is for the ID of the typelib if this project is exposed to COM
+[assembly: Guid("68a2ef80-e51b-4abb-9ccc-81354e152758")]
+
+// Version information for an assembly consists of the following four values:
+//
+//      Major Version
+//      Minor Version 
+//      Build Number
+//      Revision
+//
+// You can specify all the values or you can default the Build and Revision Numbers 
+// by using the '*' as shown below:
+// [assembly: AssemblyVersion("1.0.*")]
+[assembly: AssemblyVersion("1.0.0.0")]
+[assembly: AssemblyFileVersion("1.0.0.0")]

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake.Tests/PubSubSubjectTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/PubSubSubjectTest.cs b/lang/cs/Org.Apache.REEF.Wake.Tests/PubSubSubjectTest.cs
new file mode 100644
index 0000000..7ce56d3
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/PubSubSubjectTest.cs
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Reactive;
+using System.Threading;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.REEF.Wake.RX.Impl;
+
+namespace Org.Apache.REEF.Wake.Tests
+{
+    [TestClass]
+    public class PubSubSubjectTest
+    {
+        [TestMethod]
+        public void TestPubSubSubjectSingleThread()
+        {
+            int sum = 0;
+
+            // Observer that adds sum of numbers up to and including x
+            PubSubSubject<int> subject = new PubSubSubject<int>();
+            subject.Subscribe(Observer.Create<int>(
+                x =>
+                {
+                    for (int i = 0; i <= x; i++)
+                    {
+                        sum += i;
+                    }
+                }));
+
+            subject.OnNext(10);
+            subject.OnCompleted();
+            Assert.AreEqual(sum, 55);
+        }
+
+        [TestMethod]
+        public void TestPubSubSubjectMultipleThreads()
+        {
+            int sum = 0;
+
+            PubSubSubject<int> subject = new PubSubSubject<int>();
+            subject.Subscribe(Observer.Create<int>(x => sum += x));
+
+            Thread[] threads = new Thread[10];
+            for (int i = 0; i < threads.Length; i++)
+            {
+                threads[i] = new Thread(() =>
+                {
+                    for (int j = 0; j < 10000; j++)
+                    {
+                        subject.OnNext(1);
+                    }
+                });
+
+                threads[i].Start();
+            }
+
+            foreach (Thread thread in threads)
+            {
+                thread.Join();
+            }
+
+            Assert.AreEqual(sum, 100000);
+        }
+
+        [TestMethod]
+        public void TestMultipleTypes()
+        {
+            int sum1 = 0;
+            int sum2 = 0;
+
+            PubSubSubject<SuperEvent> subject = new PubSubSubject<SuperEvent>();
+            subject.Subscribe(Observer.Create<SubEvent1>(x => sum1 += 100));
+            subject.Subscribe(Observer.Create<SubEvent2>(x => sum2 += 500));
+
+            subject.OnNext(new SubEvent1());
+            subject.OnNext(new SubEvent2());
+            subject.OnNext(new SubEvent2());
+
+            Assert.AreEqual(sum1, 100);
+            Assert.AreEqual(sum2, 1000);
+        }
+
+        [TestMethod]
+        public void TestOnCompleted()
+        {
+            int sum = 0;
+
+            PubSubSubject<int> subject = new PubSubSubject<int>();
+            subject.Subscribe(Observer.Create<int>(x => sum += x));
+
+            subject.OnNext(10);
+            Assert.AreEqual(10, sum);
+
+            subject.OnNext(10);
+            Assert.AreEqual(20, sum);
+
+            // Check that after calling OnCompleted, OnNext will do nothing
+            subject.OnCompleted();
+            subject.OnNext(10);
+            Assert.AreEqual(20, sum);
+        }
+
+        [TestMethod]
+        public void TestOnError()
+        {
+            int sum = 0;
+
+            PubSubSubject<int> subject = new PubSubSubject<int>();
+            subject.Subscribe(Observer.Create<int>(x => sum += x));
+
+            subject.OnNext(10);
+            Assert.AreEqual(10, sum);
+
+            subject.OnNext(10);
+            Assert.AreEqual(20, sum);
+
+            // Check that after calling OnError, OnNext will do nothing
+            subject.OnError(new Exception("error"));
+            subject.OnNext(10);
+            Assert.AreEqual(20, sum);
+        }
+
+        [TestMethod]
+        public void TestDisposeSingleSubject()
+        {
+            int sum = 0;
+
+            PubSubSubject<int> subject = new PubSubSubject<int>();
+            var disposable = subject.Subscribe(Observer.Create<int>(x => sum += x));
+            
+            subject.OnNext(10);
+            subject.OnNext(10);
+            subject.OnNext(10);
+            Assert.AreEqual(30, sum);
+
+            // Unregister the subject and check that calling OnNext does nothing
+            disposable.Dispose();
+            subject.OnNext(10);
+            Assert.AreEqual(30, sum);
+        }
+
+        [TestMethod]
+        public void TestDisposeMultipleSubjects()
+        {
+            int sum1 = 0;
+            int sum2 = 0;
+
+            SubEvent1 event1 = new SubEvent1();
+            SubEvent2 event2 = new SubEvent2();
+
+            PubSubSubject<SuperEvent> subject = new PubSubSubject<SuperEvent>();
+            var disposable1 = subject.Subscribe(Observer.Create<SubEvent1>(x => sum1 += 100));
+            var disposable2 = subject.Subscribe(Observer.Create<SubEvent2>(x => sum2 += 500));
+
+            subject.OnNext(event1);
+            subject.OnNext(event2);
+            subject.OnNext(event2);
+            Assert.AreEqual(sum1, 100);
+            Assert.AreEqual(sum2, 1000);
+
+            // Check that unsubscribing from SubEvent1 does not affect other subscriptions
+            disposable1.Dispose();
+            subject.OnNext(event1);
+            subject.OnNext(event2);
+            Assert.AreEqual(sum1, 100);
+            Assert.AreEqual(sum2, 1500);
+
+            // Unsubscribe from the remaining event types
+            disposable2.Dispose();
+            subject.OnNext(event1);
+            subject.OnNext(event2);
+            Assert.AreEqual(sum1, 100);
+            Assert.AreEqual(sum2, 1500);
+        }
+
+        class SuperEvent
+        {
+        }
+
+        class SubEvent1 : SuperEvent
+        {
+        }
+
+        class SubEvent2 : SuperEvent
+        {
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake.Tests/RemoteManagerTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/RemoteManagerTest.cs b/lang/cs/Org.Apache.REEF.Wake.Tests/RemoteManagerTest.cs
new file mode 100644
index 0000000..3b3ac6d
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/RemoteManagerTest.cs
@@ -0,0 +1,338 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Net;
+using System.Reactive;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Wake.Remote.Impl;
+using Org.Apache.REEF.Wake.Util;
+
+namespace Org.Apache.REEF.Wake.Tests
+{
+    [TestClass]
+    public class RemoteManagerTest
+    {
+        [TestMethod]
+        public void TestOneWayCommunication()
+        {
+            IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
+
+            BlockingCollection<string> queue = new BlockingCollection<string>();
+            List<string> events = new List<string>();
+
+            using (var remoteManager1 = new DefaultRemoteManager<string>(listeningAddress, 0, new StringCodec()))
+            using (var remoteManager2 = new DefaultRemoteManager<string>(listeningAddress, 0, new StringCodec()))
+            {
+                var observer = Observer.Create<string>(queue.Add);
+                IPEndPoint endpoint1 = new IPEndPoint(listeningAddress, 0);
+                remoteManager2.RegisterObserver(endpoint1, observer);
+
+                var remoteObserver = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
+                remoteObserver.OnNext("abc");
+                remoteObserver.OnNext("def");
+                remoteObserver.OnNext("ghi");
+
+                events.Add(queue.Take());
+                events.Add(queue.Take());
+                events.Add(queue.Take());
+            }
+
+            Assert.AreEqual(3, events.Count);
+        }
+
+        [TestMethod]
+        public void TestOneWayCommunicationClientOnly()
+        {
+            int listeningPort = NetworkUtils.GenerateRandomPort(6000, 7000);
+            IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
+
+            BlockingCollection<string> queue = new BlockingCollection<string>();
+            List<string> events = new List<string>();
+
+            using (var remoteManager1 = new DefaultRemoteManager<string>(new StringCodec()))
+            using (var remoteManager2 = new DefaultRemoteManager<string>(listeningAddress, listeningPort, new StringCodec()))
+            {
+                IPEndPoint remoteEndpoint = new IPEndPoint(listeningAddress, 0);
+                var observer = Observer.Create<string>(queue.Add);
+                remoteManager2.RegisterObserver(remoteEndpoint, observer);
+
+                var remoteObserver = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
+                remoteObserver.OnNext("abc");
+                remoteObserver.OnNext("def");
+                remoteObserver.OnNext("ghi");
+
+                events.Add(queue.Take());
+                events.Add(queue.Take());
+                events.Add(queue.Take());
+            }
+
+            Assert.AreEqual(3, events.Count);
+        }
+
+        [TestMethod]
+        public void TestTwoWayCommunication()
+        {
+            IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
+
+            BlockingCollection<string> queue1 = new BlockingCollection<string>();
+            BlockingCollection<string> queue2 = new BlockingCollection<string>();
+            List<string> events1 = new List<string>();
+            List<string> events2 = new List<string>();
+
+            using (var remoteManager1 = new DefaultRemoteManager<string>(listeningAddress, 0, new StringCodec()))
+            using (var remoteManager2 = new DefaultRemoteManager<string>(listeningAddress, 0, new StringCodec()))
+            {
+                // Register observers for remote manager 1 and remote manager 2
+                var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
+                var observer1 = Observer.Create<string>(queue1.Add);
+                var observer2 = Observer.Create<string>(queue2.Add);
+                remoteManager1.RegisterObserver(remoteEndpoint, observer1);
+                remoteManager2.RegisterObserver(remoteEndpoint, observer2);
+
+                // Remote manager 1 sends 3 events to remote manager 2
+                var remoteObserver1 = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
+                remoteObserver1.OnNext("abc");
+                remoteObserver1.OnNext("def");
+                remoteObserver1.OnNext("ghi");
+
+                // Remote manager 2 sends 4 events to remote manager 1
+                var remoteObserver2 = remoteManager2.GetRemoteObserver(remoteManager1.LocalEndpoint);
+                remoteObserver2.OnNext("jkl");
+                remoteObserver2.OnNext("mno");
+                remoteObserver2.OnNext("pqr");
+                remoteObserver2.OnNext("stu");
+
+                events1.Add(queue1.Take());
+                events1.Add(queue1.Take());
+                events1.Add(queue1.Take());
+                events1.Add(queue1.Take());
+
+                events2.Add(queue2.Take());
+                events2.Add(queue2.Take());
+                events2.Add(queue2.Take());
+            }
+
+            Assert.AreEqual(4, events1.Count);
+            Assert.AreEqual(3, events2.Count);
+        }
+
+        [TestMethod]
+        public void TestCommunicationThreeNodesOneWay()
+        {
+            IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
+
+            BlockingCollection<string> queue = new BlockingCollection<string>();
+            List<string> events = new List<string>();
+
+            using (var remoteManager1 = new DefaultRemoteManager<string>(listeningAddress, 0, new StringCodec()))
+            using (var remoteManager2 = new DefaultRemoteManager<string>(listeningAddress, 0, new StringCodec()))
+            using (var remoteManager3 = new DefaultRemoteManager<string>(listeningAddress, 0, new StringCodec()))
+            {
+                var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
+                var observer = Observer.Create<string>(queue.Add);
+                remoteManager3.RegisterObserver(remoteEndpoint, observer);
+
+                var remoteObserver1 = remoteManager1.GetRemoteObserver(remoteManager3.LocalEndpoint);
+                var remoteObserver2 = remoteManager2.GetRemoteObserver(remoteManager3.LocalEndpoint);
+
+                remoteObserver2.OnNext("abc");
+                remoteObserver1.OnNext("def");
+                remoteObserver2.OnNext("ghi");
+                remoteObserver1.OnNext("jkl");
+                remoteObserver2.OnNext("mno");
+
+                for (int i = 0; i < 5; i++)
+                {
+                    events.Add(queue.Take());
+                }
+            }
+
+            Assert.AreEqual(5, events.Count);
+        }
+
+        [TestMethod]
+        public void TestCommunicationThreeNodesBothWays()
+        {
+            IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
+
+            BlockingCollection<string> queue1 = new BlockingCollection<string>();
+            BlockingCollection<string> queue2 = new BlockingCollection<string>();
+            BlockingCollection<string> queue3 = new BlockingCollection<string>();
+            List<string> events1 = new List<string>();
+            List<string> events2 = new List<string>();
+            List<string> events3 = new List<string>();
+
+            using (var remoteManager1 = new DefaultRemoteManager<string>(listeningAddress, 0, new StringCodec()))
+            using (var remoteManager2 = new DefaultRemoteManager<string>(listeningAddress, 0, new StringCodec()))
+            using (var remoteManager3 = new DefaultRemoteManager<string>(listeningAddress, 0, new StringCodec()))
+            {
+                var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
+
+                var observer = Observer.Create<string>(queue1.Add);
+                remoteManager1.RegisterObserver(remoteEndpoint, observer);
+                var observer2 = Observer.Create<string>(queue2.Add);
+                remoteManager2.RegisterObserver(remoteEndpoint, observer2);
+                var observer3 = Observer.Create<string>(queue3.Add);
+                remoteManager3.RegisterObserver(remoteEndpoint, observer3);
+
+                var remoteObserver1 = remoteManager1.GetRemoteObserver(remoteManager3.LocalEndpoint);
+                var remoteObserver2 = remoteManager2.GetRemoteObserver(remoteManager3.LocalEndpoint);
+
+                // Observer 1 and 2 send messages to observer 3
+                remoteObserver1.OnNext("abc");
+                remoteObserver1.OnNext("abc");
+                remoteObserver1.OnNext("abc");
+                remoteObserver2.OnNext("def");
+                remoteObserver2.OnNext("def");
+
+                // Observer 3 sends messages back to observers 1 and 2
+                var remoteObserver3a = remoteManager3.GetRemoteObserver(remoteManager1.LocalEndpoint);
+                var remoteObserver3b = remoteManager3.GetRemoteObserver(remoteManager2.LocalEndpoint);
+
+                remoteObserver3a.OnNext("ghi");
+                remoteObserver3a.OnNext("ghi");
+                remoteObserver3b.OnNext("jkl");
+                remoteObserver3b.OnNext("jkl");
+                remoteObserver3b.OnNext("jkl");
+
+                events1.Add(queue1.Take());
+                events1.Add(queue1.Take());
+
+                events2.Add(queue2.Take());
+                events2.Add(queue2.Take());
+                events2.Add(queue2.Take());
+
+                events3.Add(queue3.Take());
+                events3.Add(queue3.Take());
+                events3.Add(queue3.Take());
+                events3.Add(queue3.Take());
+                events3.Add(queue3.Take());
+            }
+
+            Assert.AreEqual(2, events1.Count);
+            Assert.AreEqual(3, events2.Count);
+            Assert.AreEqual(5, events3.Count);
+        }
+
+        [TestMethod]
+        public void TestRemoteSenderCallback()
+        {
+            IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
+
+            BlockingCollection<string> queue = new BlockingCollection<string>();
+            List<string> events = new List<string>();
+
+            using (var remoteManager1 = new DefaultRemoteManager<string>(listeningAddress, 0, new StringCodec()))
+            using (var remoteManager2 = new DefaultRemoteManager<string>(listeningAddress, 0, new StringCodec()))
+            {
+                // Register handler for when remote manager 2 receives events; respond
+                // with an ack
+                var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
+                var remoteObserver2 = remoteManager2.GetRemoteObserver(remoteManager1.LocalEndpoint);
+
+                var receiverObserver = Observer.Create<string>(
+                    message => remoteObserver2.OnNext("received message: " + message));
+                remoteManager2.RegisterObserver(remoteEndpoint, receiverObserver);
+
+                // Register handler for remote manager 1 to record the ack
+                var senderObserver = Observer.Create<string>(queue.Add);
+                remoteManager1.RegisterObserver(remoteEndpoint, senderObserver);
+
+                // Begin to send messages
+                var remoteObserver1 = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
+                remoteObserver1.OnNext("hello");
+                remoteObserver1.OnNext("there");
+                remoteObserver1.OnNext("buddy");
+
+                events.Add(queue.Take());
+                events.Add(queue.Take());
+                events.Add(queue.Take());
+            }
+
+            Assert.AreEqual(3, events.Count);
+            Assert.AreEqual("received message: hello", events[0]);
+            Assert.AreEqual("received message: there", events[1]);
+            Assert.AreEqual("received message: buddy", events[2]);
+        }
+        
+        [TestMethod]
+        public void TestRegisterObserverByType()
+        {
+            IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
+
+            BlockingCollection<string> queue = new BlockingCollection<string>();
+            List<string> events = new List<string>();
+
+            using (var remoteManager1 = new DefaultRemoteManager<string>(listeningAddress, 0, new StringCodec()))
+            using (var remoteManager2 = new DefaultRemoteManager<string>(listeningAddress, 0, new StringCodec()))
+            {
+                // RemoteManager2 listens and records events of type IRemoteEvent<string>
+                var observer = Observer.Create<IRemoteMessage<string>>(message => queue.Add(message.Message));
+                remoteManager2.RegisterObserver(observer);
+
+                // Remote manager 1 sends 3 events to remote manager 2
+                var remoteObserver = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
+                remoteObserver.OnNext("abc");
+                remoteObserver.OnNext("def");
+                remoteObserver.OnNext("ghi");
+
+                events.Add(queue.Take());
+                events.Add(queue.Take());
+                events.Add(queue.Take());
+            }
+
+            Assert.AreEqual(3, events.Count);
+        }
+
+        [TestMethod]
+        public void TestCachedConnection()
+        {
+            IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
+
+            BlockingCollection<string> queue = new BlockingCollection<string>();
+            List<string> events = new List<string>();
+
+            using (var remoteManager1 = new DefaultRemoteManager<string>(listeningAddress, 0, new StringCodec()))
+            using (var remoteManager2 = new DefaultRemoteManager<string>(listeningAddress, 0, new StringCodec()))
+            {
+                var observer = Observer.Create<string>(queue.Add);
+                IPEndPoint endpoint1 = new IPEndPoint(listeningAddress, 0);
+                remoteManager2.RegisterObserver(endpoint1, observer);
+
+                var remoteObserver = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
+                remoteObserver.OnNext("abc");
+                remoteObserver.OnNext("def");
+
+                var cachedObserver = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
+                cachedObserver.OnNext("ghi");
+                cachedObserver.OnNext("jkl");
+
+                events.Add(queue.Take());
+                events.Add(queue.Take());
+                events.Add(queue.Take());
+                events.Add(queue.Take());
+            }
+
+            Assert.AreEqual(4, events.Count);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake.Tests/TransportTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/TransportTest.cs b/lang/cs/Org.Apache.REEF.Wake.Tests/TransportTest.cs
new file mode 100644
index 0000000..3e67e0d
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/TransportTest.cs
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Net;
+using System.Reactive;
+using System.Threading.Tasks;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Wake.Remote.Impl;
+using Org.Apache.REEF.Wake.Util;
+
+namespace Org.Apache.REEF.Wake.Tests
+{
+    [TestClass]
+    public class TransportTest
+    {
+        [TestMethod]
+        public void TestTransportServer()
+        {
+            ICodec<string> codec = new StringCodec();
+            int port = NetworkUtils.GenerateRandomPort(6000, 7000);
+
+            BlockingCollection<string> queue = new BlockingCollection<string>();
+            List<string> events = new List<string>();
+
+            IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, port);
+            var remoteHandler = Observer.Create<TransportEvent<string>>(tEvent => queue.Add(tEvent.Data));
+
+            using (var server = new TransportServer<string>(endpoint, remoteHandler, codec))
+            {
+                server.Run();
+
+                IPEndPoint remoteEndpoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), port);
+                using (var client = new TransportClient<string>(remoteEndpoint, codec))
+                {
+                    client.Send("Hello");
+                    client.Send(", ");
+                    client.Send("World!");
+
+                    events.Add(queue.Take());
+                    events.Add(queue.Take());
+                    events.Add(queue.Take());
+                } 
+            }
+
+            Assert.AreEqual(3, events.Count);
+        }
+
+        [TestMethod]
+        public void TestTransportServerEvent()
+        {
+            ICodec<TestEvent> codec = new TestEventCodec();
+            int port = NetworkUtils.GenerateRandomPort(6000, 7000);
+
+            BlockingCollection<TestEvent> queue = new BlockingCollection<TestEvent>();
+            List<TestEvent> events = new List<TestEvent>();
+
+            IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, port);
+            var remoteHandler = Observer.Create<TransportEvent<TestEvent>>(tEvent => queue.Add(tEvent.Data));
+
+            using (var server = new TransportServer<TestEvent>(endpoint, remoteHandler, codec))
+            {
+                server.Run();
+
+                IPEndPoint remoteEndpoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), port);
+                using (var client = new TransportClient<TestEvent>(remoteEndpoint, codec))
+                {
+                    client.Send(new TestEvent("Hello"));
+                    client.Send(new TestEvent(", "));
+                    client.Send(new TestEvent("World!"));
+
+                    events.Add(queue.Take());
+                    events.Add(queue.Take());
+                    events.Add(queue.Take());
+                } 
+            }
+
+            Assert.AreEqual(3, events.Count);
+        }
+
+        [TestMethod]
+        public void TestTransportSenderStage()
+        {
+            ICodec<string> codec = new StringCodec();
+            int port = NetworkUtils.GenerateRandomPort(6000, 7000);
+            IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, port);
+
+            List<string> events = new List<string>();
+            BlockingCollection<string> queue = new BlockingCollection<string>();
+
+            // Server echoes the message back to the client
+            var remoteHandler = Observer.Create<TransportEvent<string>>(tEvent => tEvent.Link.Write(tEvent.Data));
+
+            using (TransportServer<string> server = new TransportServer<string>(endpoint, remoteHandler, codec))
+            {
+                server.Run();
+
+                var clientHandler = Observer.Create<TransportEvent<string>>(tEvent => queue.Add(tEvent.Data));
+                IPEndPoint remoteEndpoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), port);
+                using (var client = new TransportClient<string>(remoteEndpoint, codec, clientHandler))
+                {
+                    client.Send("Hello");
+                    client.Send(", ");
+                    client.Send(" World");
+
+                    events.Add(queue.Take());
+                    events.Add(queue.Take());
+                    events.Add(queue.Take());
+                } 
+            }
+
+            Assert.AreEqual(3, events.Count);
+        }
+
+        [TestMethod]
+        public void TestRaceCondition()
+        {
+            ICodec<string> codec = new StringCodec();
+            int port = NetworkUtils.GenerateRandomPort(6000, 7000);
+
+            BlockingCollection<string> queue = new BlockingCollection<string>();
+            List<string> events = new List<string>();
+            int numEventsExpected = 150;
+
+            IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, port);
+            var remoteHandler = Observer.Create<TransportEvent<string>>(tEvent => queue.Add(tEvent.Data));
+
+            using (var server = new TransportServer<string>(endpoint, remoteHandler, codec))
+            {
+                server.Run();
+
+                for (int i = 0; i < numEventsExpected / 3; i++)
+                {
+                    Task.Run(() =>
+                    {
+                        IPEndPoint remoteEndpoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), port);
+                        using (var client = new TransportClient<string>(remoteEndpoint, codec))
+                        {
+                            client.Send("Hello");
+                            client.Send(", ");
+                            client.Send("World!");
+                        }
+                    });
+                }
+
+                for (int i = 0; i < numEventsExpected; i++)
+                {
+                    events.Add(queue.Take());
+                }
+            }
+
+            Assert.AreEqual(numEventsExpected, events.Count);
+        }
+
+        private class TestEvent
+        {
+            public TestEvent(string message)
+            {
+                Message = message;
+            }
+
+            public string Message { get; set; }
+
+            public override string ToString()
+            {
+                return "TestEvent: " + Message;
+            }
+        }
+
+        private class TestEventCodec : ICodec<TestEvent>
+        {
+            public byte[] Encode(TestEvent obj)
+            {
+                return new StringCodec().Encode(obj.Message);
+            }
+
+            public TestEvent Decode(byte[] data)
+            {
+                return new TestEvent(new StringCodec().Decode(data));
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake.Tests/packages.config
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/packages.config b/lang/cs/Org.Apache.REEF.Wake.Tests/packages.config
new file mode 100644
index 0000000..75e5b34
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/packages.config
@@ -0,0 +1,23 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<packages>
+  <package id="Rx-Core" version="2.2.5" targetFramework="net45" />
+  <package id="Rx-Interfaces" version="2.2.5" targetFramework="net45" />
+</packages>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/AbstractEStage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/AbstractEStage.cs b/lang/cs/Org.Apache.REEF.Wake/AbstractEStage.cs
new file mode 100644
index 0000000..ec78c82
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/AbstractEStage.cs
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.REEF.Wake
+{
+    /// <summary>
+    /// An estage that implements metering
+    /// </summary>
+    /// <typeparam name="T">The estage type</typeparam>
+    public abstract class AbstractEStage<T> : IEStage<T>
+    {
+        /// <summary>Constructs an abstract estage</summary>
+        /// <param name="meterName">the meter name</param>
+        protected AbstractEStage(string meterName)
+        {
+        }
+
+        /// <summary>Updates the meter</summary>
+        /// <param name="value">an event</param>
+        public virtual void OnNext(T value)
+        {
+        }
+
+        public abstract void Dispose();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/IEStage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/IEStage.cs b/lang/cs/Org.Apache.REEF.Wake/IEStage.cs
new file mode 100644
index 0000000..d9e91b6
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/IEStage.cs
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.REEF.Wake
+{
+    /// <summary>Stage that executes an event handler</summary>
+    public interface IEStage<T> : IEventHandler<T>, IStage
+    {
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/IEventHandler.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/IEventHandler.cs b/lang/cs/Org.Apache.REEF.Wake/IEventHandler.cs
new file mode 100644
index 0000000..6ee267d
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/IEventHandler.cs
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.REEF.Wake
+{
+    /// <summary>
+    /// Handler to process an event
+    /// </summary>
+    /// <typeparam name="T">The type of event</typeparam>
+    public interface IEventHandler<T>
+    {
+        /// <summary>
+        /// Process an event
+        /// </summary>
+        /// <param name="value">The event to process</param>
+        void OnNext(T value);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/IIdentifier.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/IIdentifier.cs b/lang/cs/Org.Apache.REEF.Wake/IIdentifier.cs
new file mode 100644
index 0000000..3ccaf1f
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/IIdentifier.cs
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Org.Apache.REEF.Wake
+{
+    /// <summary>
+    /// Identifier class for REEF.
+    ///
+    /// Identifiers are a generic naming primitive that carry some information about
+    /// the type of object that they point to. 
+    ///
+    /// Examples include remote sockets or filenames.
+    /// </summary>
+    public abstract class IIdentifier
+    {
+        /// <summary>
+        /// Returns a hash code for the object
+        /// </summary>
+        /// <returns>The hash code value for this object</returns>
+        public abstract override int GetHashCode();
+
+        /// <summary>
+        /// Checks that another object is equal to this object
+        /// </summary>
+        /// <param name="o">The object to compare</param>
+        /// <returns>True if the object is the same as the object argument; false, otherwise</returns>
+        public abstract override bool Equals(object o);
+
+        /// <summary>
+        /// Returns a string representation of this object
+        /// </summary>
+        /// <returns>A string representation of this object</returns>
+        public abstract override string ToString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/IIdentifierFactory.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/IIdentifierFactory.cs b/lang/cs/Org.Apache.REEF.Wake/IIdentifierFactory.cs
new file mode 100644
index 0000000..9e781f3
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/IIdentifierFactory.cs
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Wake.Remote.Impl;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Org.Apache.REEF.Wake
+{
+    [DefaultImplementation(typeof(StringIdentifierFactory))]
+    public interface IIdentifierFactory
+    {
+        IIdentifier Create(string s);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/IObserverFactory.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/IObserverFactory.cs b/lang/cs/Org.Apache.REEF.Wake/IObserverFactory.cs
new file mode 100644
index 0000000..d43e4b3
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/IObserverFactory.cs
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Org.Apache.REEF.Wake
+{
+    public interface IObserverFactory
+    {
+        object Create();
+    }
+}