You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/02/05 22:05:49 UTC

[28/51] [partial] incubator-reef git commit: [REEF-131] Towards the new .Net project structure This is to change .Net project structure for Tang, Wake, REEF utilities, Common and Driver:

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/IStage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/IStage.cs b/lang/cs/Org.Apache.REEF.Wake/IStage.cs
new file mode 100644
index 0000000..d7121a8
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/IStage.cs
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+
+namespace Org.Apache.REEF.Wake
+{
+    /// <summary>Stage is an execution unit for events and provides a way to reclaim its resources
+    ///     </summary>
+    public interface IStage : IDisposable
+    {
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Impl/LoggingEventHandler.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Impl/LoggingEventHandler.cs b/lang/cs/Org.Apache.REEF.Wake/Impl/LoggingEventHandler.cs
new file mode 100644
index 0000000..cd2bff5
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Impl/LoggingEventHandler.cs
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Tang.Annotations;
+using System;
+
+namespace Org.Apache.REEF.Wake.Impl
+{
+    /// <summary>A logging event handler</summary>
+    public class LoggingEventHandler<T> : IObserver<T>
+    {
+        private static readonly Logger LOGGER = Logger.GetLogger(typeof(T));
+
+        [Inject]
+        public LoggingEventHandler()
+        {
+        }
+
+        /// <summary>Logs the event</summary>
+        /// <param name="value">an event</param>
+        public void OnNext(T value)
+        {
+            LOGGER.Log(Level.Verbose, "Event: " + DateTime.Now + value);
+        }
+
+        public void OnError(Exception error)
+        {
+            throw new NotImplementedException();
+        }
+
+        public void OnCompleted()
+        {
+            throw new NotImplementedException();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Impl/MissingStartHandlerHandler.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Impl/MissingStartHandlerHandler.cs b/lang/cs/Org.Apache.REEF.Wake/Impl/MissingStartHandlerHandler.cs
new file mode 100644
index 0000000..324eb61
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Impl/MissingStartHandlerHandler.cs
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Wake.Time;
+
+namespace Org.Apache.REEF.Wake.Impl
+{
+    public class MissingStartHandlerHandler : IObserver<StartTime>
+    {
+        [Inject]
+        public MissingStartHandlerHandler()
+        {
+        }
+
+        public void OnNext(StartTime value)
+        {
+            // Do nothing, since we only use this for evaluator, not for driver.
+            // LOGGER.Log(Level.Info, "No binding to Clock.StartHandler. It is likely that the clock will immediately go idle and close.");
+        }
+
+        public void OnError(Exception error)
+        {
+            throw new NotImplementedException();
+        }
+
+        public void OnCompleted()
+        {
+            throw new NotImplementedException();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Impl/MultiEventHandler.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Impl/MultiEventHandler.cs b/lang/cs/Org.Apache.REEF.Wake/Impl/MultiEventHandler.cs
new file mode 100644
index 0000000..b157c75
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Impl/MultiEventHandler.cs
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Wake.Impl
+{
+    /// <summary>Event handler that dispatches an event to a specific handler based on an event class type
+    /// </summary>
+    public class MultiEventHandler<T> : IEventHandler<T>
+    {
+        private static readonly Logger LOGGER = Logger.GetLogger(typeof(MultiEventHandler<T>));
+        private readonly IDictionary<Type, IEventHandler<T>> _map;
+
+        /// <summary>Constructs a multi-event handler</summary>
+        /// <param name="map">a map of class types to event handlers</param>
+        public MultiEventHandler(IDictionary<Type, IEventHandler<T>> map)
+        {
+            foreach (Type item in map.Keys)
+            {
+                if (!typeof(T).IsAssignableFrom(item))
+                {
+                    Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(new WakeRuntimeException(typeof(T) + " is not assignable from " + item), LOGGER);
+                }
+            }
+            _map = map;
+        }
+
+        /// <summary>
+        /// Invokes a specific handler for the event class type if it exists
+        /// </summary>
+        /// <param name="value">The event to handle</param>
+        public void OnNext(T value)
+        {
+            IEventHandler<T> handler = null;
+            bool success = _map.TryGetValue(value.GetType(), out handler);
+            if (success)
+            {
+                handler.OnNext(value);
+            }
+            else
+            {
+                Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(new WakeRuntimeException("No event " + value.GetType() + " handler"), LOGGER);
+            }            
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Impl/PeriodicEvent.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Impl/PeriodicEvent.cs b/lang/cs/Org.Apache.REEF.Wake/Impl/PeriodicEvent.cs
new file mode 100644
index 0000000..543d342
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Impl/PeriodicEvent.cs
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.REEF.Wake.Impl
+{
+    /// <summary>Periodic event for timers</summary>
+    public class PeriodicEvent
+    {
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Impl/PubSubEventHandler.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Impl/PubSubEventHandler.cs b/lang/cs/Org.Apache.REEF.Wake/Impl/PubSubEventHandler.cs
new file mode 100644
index 0000000..38a36be
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Impl/PubSubEventHandler.cs
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ 
+using System;
+using System.Collections.Generic;
+using System.Reflection;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Wake.Impl
+{
+    /// <summary>
+    /// Event handler to provide publish/subscribe interfaces
+    /// </summary>
+    /// <typeparam name="T">The type of event handler</typeparam>
+    public class PubSubEventHandler<T> : IEventHandler<T>
+    {
+        private static readonly Logger LOGGER = Logger.GetLogger(typeof(PubSubEventHandler<T>));
+
+        private Dictionary<Type, List<object>> _classToHandlersMap;
+
+        /// <summary>
+        /// Construct a pub-sub event handler
+        /// </summary>
+        public PubSubEventHandler()
+        {
+            _classToHandlersMap = new Dictionary<Type, List<object>>();
+        }
+
+        /// <summary>
+        /// Subscribe an event handler for an event type
+        /// </summary>
+        /// <typeparam name="U">The type of event handler</typeparam>
+        /// <param name="handler">The event handler</param>
+        public void Subscribe<U>(IEventHandler<U> handler) where U : T
+        {
+            lock (_classToHandlersMap)
+            {
+                List<object> handlers;
+                if (!_classToHandlersMap.TryGetValue(typeof(U), out handlers))
+                {
+                    handlers = new List<object>();
+                    _classToHandlersMap[typeof(U)] = handlers;
+                }
+                handlers.Add(handler);
+            }
+        }
+
+        /// <summary>
+        /// Invoke the subscribed handlers for the event class type
+        /// </summary>
+        /// <param name="value">The event to process</param>
+        public void OnNext(T value)
+        {
+            if (value == null)
+            {
+                Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(new ArgumentNullException("value"), LOGGER);
+            }
+
+            lock (_classToHandlersMap)
+            {
+                // Check that the event type has been subscribed
+                List<object> handlers;
+                if (!_classToHandlersMap.TryGetValue(value.GetType(), out handlers))
+                {
+                    Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(new ArgumentException("No event for type " + value.GetType()), LOGGER);
+                }
+
+                // Invoke each handler for the event type
+                foreach (object handler in handlers)
+                {
+                    Type handlerType = typeof(IEventHandler<>).MakeGenericType(new[] { value.GetType() });
+                    MethodInfo info = handlerType.GetMethod("OnNext");
+                    info.Invoke(handler, new[] { (object)value });
+                }
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Impl/SingleThreadStage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Impl/SingleThreadStage.cs b/lang/cs/Org.Apache.REEF.Wake/Impl/SingleThreadStage.cs
new file mode 100644
index 0000000..e27f67e
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Impl/SingleThreadStage.cs
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Concurrent;
+using System.Threading;
+
+namespace Org.Apache.REEF.Wake.Impl
+{
+    /// <summary>Single thread stage that runs the event handler</summary>
+    public class SingleThreadStage<T> : AbstractEStage<T>
+    {
+        private readonly BlockingCollection<T> queue;
+
+        private readonly Thread thread;
+
+        private bool interrupted;
+
+        public SingleThreadStage(IEventHandler<T> handler, int capacity) : base(handler.GetType().FullName)
+        {
+            queue = new BlockingCollection<T>(capacity);
+            interrupted = false;
+            thread = new Thread(new ThreadStart(new Producer<T>(queue, handler, interrupted).Run));
+            thread.Start();
+        }
+
+        /// <summary>
+        /// Puts the value to the queue, which will be processed by the handler later
+        /// if the queue is full, IllegalStateException is thrown
+        /// </summary>
+        /// <param name="value">the value</param>
+        public override void OnNext(T value)
+        {
+            base.OnNext(value);
+            queue.Add(value);
+        }
+
+        /// <summary>
+        /// Closes the stage
+        /// </summary>
+        public override void Dispose()
+        {
+            interrupted = true;
+            thread.Interrupt();
+        }
+    }
+
+    /// <summary>Takes events from the queue and provides them to the handler</summary>
+    /// <typeparam name="T">The type</typeparam>
+    internal class Producer<T> 
+    {
+        private readonly BlockingCollection<T> _queue;
+
+        private readonly IEventHandler<T> _handler;
+
+        private volatile bool _interrupted;
+
+        internal Producer(BlockingCollection<T> queue, IEventHandler<T> handler, bool interrupted)
+        {
+            _queue = queue;
+            _handler = handler;
+            _interrupted = interrupted;
+        }
+
+        public void Run()
+        {
+            while (true)
+            {
+                try
+                {
+                    T value = _queue.Take();
+                    _handler.OnNext(value);
+                }
+                catch (Exception)
+                {
+                    if (_interrupted)
+                    {
+                        break;
+                    }
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Impl/SyncStage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Impl/SyncStage.cs b/lang/cs/Org.Apache.REEF.Wake/Impl/SyncStage.cs
new file mode 100644
index 0000000..c6a7b22
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Impl/SyncStage.cs
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ 
+using Org.Apache.REEF.Wake;
+
+namespace Org.Apache.REEF.Wake.Impl
+{
+    /// <summary>Stage that synchronously executes an event handler</summary>
+    public class SyncStage<T> : AbstractEStage<T>
+    {
+        private readonly IEventHandler<T> _handler;
+
+        /// <summary>Constructs a synchronous stage</summary>
+        /// <param name="handler">an event handler</param>
+        public SyncStage(IEventHandler<T> handler) : base(handler.GetType().FullName)
+        {
+            _handler = handler;
+        }
+
+        /// <summary>Invokes the handler for the event</summary>
+        /// <param name="value">an event</param>
+        public override void OnNext(T value)
+        {
+            base.OnNext(value);
+            _handler.OnNext(value);
+        }
+
+        public override void Dispose()
+        {
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Impl/ThreadPoolStage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Impl/ThreadPoolStage.cs b/lang/cs/Org.Apache.REEF.Wake/Impl/ThreadPoolStage.cs
new file mode 100644
index 0000000..f1bb67d
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Impl/ThreadPoolStage.cs
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Threading;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Wake;
+using Org.Apache.REEF.Wake.Util;
+
+namespace Org.Apache.REEF.Wake.Impl
+{
+    /// <summary>Stage that executes an event handler with a thread pool</summary>
+    public class ThreadPoolStage<T> : AbstractEStage<T>
+    {
+        private static readonly Logger LOGGER = Logger.GetLogger(typeof(ThreadPoolStage<T>));
+
+        private readonly IEventHandler<T> _handler;
+
+        private readonly ITaskService _taskService;
+
+        private readonly int _numThreads;
+
+        /// <summary>Constructs a thread-pool stage</summary>
+        /// <param name="handler">An event handler to execute</param>
+        /// <param name="numThreads">The number of threads to use</param>
+        public ThreadPoolStage(IEventHandler<T> handler, int numThreads) 
+            : base(handler.GetType().FullName)
+        {
+            _handler = handler;
+            if (numThreads <= 0)
+            {
+                Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(new WakeRuntimeException("numThreads " + numThreads + " is less than or equal to 0"), LOGGER);
+            }
+            _numThreads = numThreads;
+            _taskService = new FixedThreadPoolTaskService(numThreads);
+        }
+
+        /// <summary>Constructs a thread-pool stage</summary>
+        /// <param name="handler">an event handler to execute</param>
+        /// <param name="taskService">an external executor service provided</param>
+        public ThreadPoolStage(IEventHandler<T> handler, ITaskService taskService) : base(
+            handler.GetType().FullName)
+        {
+            _handler = handler;
+            _numThreads = 0;
+            _taskService = taskService;
+        }
+
+        /// <summary>Handles the event using a thread in the thread pool</summary>
+        /// <param name="value">an event</param>
+        public override void OnNext(T value)
+        {
+            base.OnNext(value);
+            _taskService.Execute(new _Startable_74(this, value).Start);
+        }
+
+        /// <summary>
+        /// Closes resources
+        /// </summary>
+        public override void Dispose()
+        {
+            if (_numThreads > 0)
+            {
+                _taskService.Shutdown();
+            }
+        }
+
+        private sealed class _Startable_74 : IStartable
+        {
+            private readonly ThreadPoolStage<T> _enclosing;
+            private readonly T _value;
+
+            public _Startable_74(ThreadPoolStage<T> enclosing, T value)
+            {
+                _enclosing = enclosing;
+                _value = value;
+            }
+
+            public void Start()
+            {
+                _enclosing._handler.OnNext(_value);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Impl/TimerStage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Impl/TimerStage.cs b/lang/cs/Org.Apache.REEF.Wake/Impl/TimerStage.cs
new file mode 100644
index 0000000..f2f2f35
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Impl/TimerStage.cs
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Timers;
+
+using Org.Apache.REEF.Wake;
+
+namespace Org.Apache.REEF.Wake.Impl
+{
+    /// <summary>Stage that triggers an event handler periodically</summary>
+    public class TimerStage : IStage
+    {
+        //private readonly ScheduledExecutorService executor;
+        private readonly Timer _timer;
+        private readonly PeriodicEvent _value = new PeriodicEvent();
+        private readonly IEventHandler<PeriodicEvent> _handler;
+
+        /// <summary>Constructs a timer stage with no initial delay</summary>
+        /// <param name="handler">an event handler</param>
+        /// <param name="period">a period in milli-seconds</param>
+        public TimerStage(IEventHandler<PeriodicEvent> handler, long period) : this(handler, 0, period)
+        {
+        }
+
+        /// <summary>Constructs a timer stage</summary>
+        /// <param name="handler">an event handler</param>
+        /// <param name="initialDelay">an initial delay</param>
+        /// <param name="period">a period in milli-seconds</param>
+        public TimerStage(IEventHandler<PeriodicEvent> handler, long initialDelay, long period)
+        {
+            _handler = handler;
+            _timer = new Timer(period);
+            _timer.Elapsed += (sender, e) => OnTimedEvent(sender, e, _handler, _value);
+            _timer.Enabled = true;
+        }
+
+        /// <summary>
+        /// Closes resources
+        /// </summary>
+        public void Dispose()
+        {
+            _timer.Stop();
+        }
+
+        private static void OnTimedEvent(object source, ElapsedEventArgs e, IEventHandler<PeriodicEvent> handler, PeriodicEvent value)
+        {
+            handler.OnNext(value);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj b/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
new file mode 100644
index 0000000..f383707
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
@@ -0,0 +1,216 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<Project ToolsVersion="12.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+  <Import Project="$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props" Condition="Exists('$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props')" />
+  <PropertyGroup>
+    <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
+    <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
+    <ProjectGuid>{CDFB3464-4041-42B1-9271-83AF24CD5008}</ProjectGuid>
+    <OutputType>Library</OutputType>
+    <AppDesignerFolder>Properties</AppDesignerFolder>
+    <RootNamespace>Org.Apache.REEF.Wake</RootNamespace>
+    <AssemblyName>Org.Apache.REEF.Wake</AssemblyName>
+    <TargetFrameworkVersion>v4.5</TargetFrameworkVersion>
+    <FileAlignment>512</FileAlignment>
+    <RestorePackages>true</RestorePackages>
+    <SolutionDir Condition="$(SolutionDir) == '' Or $(SolutionDir) == '*Undefined*'">..\</SolutionDir>
+  </PropertyGroup>
+  <Import Project="$(SolutionDir)\Source\build.props" />
+  <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
+    <PlatformTarget>AnyCPU</PlatformTarget>
+    <DebugSymbols>true</DebugSymbols>
+    <DebugType>full</DebugType>
+    <Optimize>false</Optimize>
+    <OutputPath>$(BinDir)\$(Platform)\$(Configuration)\$(RootNamespace)</OutputPath>
+    <DefineConstants>DEBUG;TRACE</DefineConstants>
+    <ErrorReport>prompt</ErrorReport>
+    <WarningLevel>4</WarningLevel>
+  </PropertyGroup>
+  <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|x64' ">
+    <PlatformTarget>AnyCPU</PlatformTarget>
+    <DebugSymbols>true</DebugSymbols>
+    <DebugType>full</DebugType>
+    <Optimize>false</Optimize>
+    <OutputPath>$(BinDir)\$(Platform)\$(Configuration)\$(RootNamespace)</OutputPath>
+    <DefineConstants>DEBUG;TRACE</DefineConstants>
+    <ErrorReport>prompt</ErrorReport>
+    <WarningLevel>4</WarningLevel>
+  </PropertyGroup>
+  <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
+    <PlatformTarget>AnyCPU</PlatformTarget>
+    <DebugType>pdbonly</DebugType>
+    <Optimize>true</Optimize>
+    <OutputPath>$(BinDir)\$(Platform)\$(Configuration)\$(RootNamespace)</OutputPath>
+    <DefineConstants>TRACE</DefineConstants>
+    <ErrorReport>prompt</ErrorReport>
+    <WarningLevel>4</WarningLevel>
+  </PropertyGroup>
+  <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|x64' ">
+    <PlatformTarget>AnyCPU</PlatformTarget>
+    <DebugType>pdbonly</DebugType>
+    <Optimize>true</Optimize>
+    <OutputPath>$(BinDir)\$(Platform)\$(Configuration)\$(RootNamespace)</OutputPath>
+    <DefineConstants>TRACE</DefineConstants>
+    <ErrorReport>prompt</ErrorReport>
+    <WarningLevel>4</WarningLevel>
+  </PropertyGroup>
+  <ItemGroup>
+    <Reference Include="protobuf-net">
+      <HintPath>$(PackagesDir)\protobuf-net.$(ProtobufVersion)\lib\net40\protobuf-net.dll</HintPath>
+    </Reference>
+    <Reference Include="System" />
+    <Reference Include="System.Core" />
+    <Reference Include="System.Reactive.Core">
+      <HintPath>$(PackagesDir)\Rx-Core.$(RxVersion)\lib\net45\System.Reactive.Core.dll</HintPath>
+    </Reference>
+    <Reference Include="System.Reactive.Interfaces">
+      <HintPath>$(PackagesDir)\Rx-Interfaces.$(RxVersion)\lib\net45\System.Reactive.Interfaces.dll</HintPath>
+    </Reference>
+    <Reference Include="System.Runtime.Serialization" />
+    <Reference Include="System.Xml.Linq" />
+    <Reference Include="System.Data.DataSetExtensions" />
+    <Reference Include="Microsoft.CSharp" />
+    <Reference Include="System.Data" />
+    <Reference Include="System.Xml" />
+  </ItemGroup>
+  <ItemGroup>
+    <Compile Include="AbstractEStage.cs" />
+    <Compile Include="IEStage.cs" />
+    <Compile Include="IEventHandler.cs" />
+    <Compile Include="IIdentifier.cs" />
+    <Compile Include="IIdentifierFactory.cs" />
+    <Compile Include="Impl\LoggingEventHandler.cs" />
+    <Compile Include="Impl\MissingStartHandlerHandler.cs" />
+    <Compile Include="Impl\MultiEventHandler.cs" />
+    <Compile Include="Impl\PeriodicEvent.cs" />
+    <Compile Include="Impl\PubSubEventHandler.cs" />
+    <Compile Include="Impl\SingleThreadStage.cs" />
+    <Compile Include="Impl\SyncStage.cs" />
+    <Compile Include="Impl\ThreadPoolStage.cs" />
+    <Compile Include="Impl\TimerStage.cs" />
+    <Compile Include="IObserverFactory.cs" />
+    <Compile Include="IStage.cs" />
+    <Compile Include="Properties\AssemblyInfo.cs" />
+    <Compile Include="Remote\ICodec.cs" />
+    <Compile Include="Remote\ICodecFactory.cs" />
+    <Compile Include="Remote\IDecoder.cs" />
+    <Compile Include="Remote\IEncoder.cs" />
+    <Compile Include="Remote\ILink.cs" />
+    <Compile Include="Remote\Impl\ByteCodec.cs" />
+    <Compile Include="Remote\Impl\ByteCodecFactory.cs" />
+    <Compile Include="Remote\Impl\Channel.cs" />
+    <Compile Include="Remote\Impl\DefaultRemoteManager.cs" />
+    <Compile Include="Remote\Impl\DefaultRemoteMessage.cs" />
+    <Compile Include="Remote\Impl\IntCodec.cs" />
+    <Compile Include="Remote\Impl\IPEndpointComparer.cs" />
+    <Compile Include="Remote\Impl\Link.cs" />
+    <Compile Include="Remote\Impl\MultiCodec.cs" />
+    <Compile Include="Remote\Impl\MultiDecoder.cs" />
+    <Compile Include="Remote\Impl\MultiEncoder.cs" />
+    <Compile Include="Remote\Impl\ObserverContainer.cs" />
+    <Compile Include="Remote\Impl\RemoteEvent.cs" />
+    <Compile Include="Remote\Impl\RemoteEventCodec.cs" />
+    <Compile Include="Remote\Impl\RemoteEventDecoder.cs" />
+    <Compile Include="Remote\Impl\RemoteEventEncoder.cs" />
+    <Compile Include="Remote\Impl\RemoteEventEndpoint.cs" />
+    <Compile Include="Remote\Impl\SocketRemoteIdentifier.cs" />
+    <Compile Include="Remote\Impl\StringCodec.cs" />
+    <Compile Include="Remote\Impl\StringIdentifier.cs" />
+    <Compile Include="Remote\Impl\StringIdentifierFactory.cs" />
+    <Compile Include="Remote\Impl\TransportClient.cs" />
+    <Compile Include="Remote\Impl\TransportEvent.cs" />
+    <Compile Include="Remote\Impl\TransportServer.cs" />
+    <Compile Include="Remote\IRemoteEvent.cs" />
+    <Compile Include="Remote\IRemoteIdentifier.cs" />
+    <Compile Include="Remote\IRemoteIdentifierFactory.cs" />
+    <Compile Include="Remote\IRemoteManager.cs" />
+    <Compile Include="Remote\IRemoteMessage.cs" />
+    <Compile Include="Remote\ISubscriptionManager.cs" />
+    <Compile Include="Remote\Proto\WakeRemoteProtos.cs" />
+    <Compile Include="Remote\RemoteConfiguration.cs" />
+    <Compile Include="Remote\RemoteRuntimeException.cs" />
+    <Compile Include="RX\AbstractObserver.cs" />
+    <Compile Include="RX\AbstractRxStage.cs" />
+    <Compile Include="RX\Impl\PubSubSubject.cs" />
+    <Compile Include="RX\Impl\RxSyncStage.cs" />
+    <Compile Include="RX\Impl\RxThreadPoolStage.cs" />
+    <Compile Include="RX\Impl\RxTimerStage.cs" />
+    <Compile Include="RX\Impl\SimpleSubject.cs" />
+    <Compile Include="RX\IRxStage.cs" />
+    <Compile Include="RX\IStaticObservable.cs" />
+    <Compile Include="RX\ISubject.cs" />
+    <Compile Include="RX\ObserverCompletedException.cs" />
+    <Compile Include="src\main\cs\Examples\P2p\IEventSource.cs" />
+    <Compile Include="src\main\cs\Examples\P2p\Pull2Push.cs" />
+    <Compile Include="src\main\cs\PeriodicEvent.cs" />
+    <Compile Include="Protobuf\WakeRemoteProtosGen.cs" />
+    <Compile Include="Time\Event\Alarm.cs" />
+    <Compile Include="Time\Event\StartTime.cs" />
+    <Compile Include="Time\Event\StopTime.cs" />
+    <Compile Include="Time\IClock.cs" />
+    <Compile Include="Time\Runtime\Event\ClientAlarm.cs" />
+    <Compile Include="Time\Runtime\Event\IdleClock.cs" />
+    <Compile Include="Time\Runtime\Event\RuntimeAlarm.cs" />
+    <Compile Include="Time\Runtime\Event\RuntimeStart.cs" />
+    <Compile Include="Time\Runtime\Event\RuntimeStop.cs" />
+    <Compile Include="Time\Runtime\ITimer.cs" />
+    <Compile Include="Time\Runtime\LogicalTimer.cs" />
+    <Compile Include="Time\Runtime\RealTimer.cs" />
+    <Compile Include="Time\Runtime\RuntimeClock.cs" />
+    <Compile Include="Time\Time.cs" />
+    <Compile Include="Util\Actionable.cs" />
+    <Compile Include="Util\Disposable.cs" />
+    <Compile Include="Util\FixedThreadPoolTaskService.cs" />
+    <Compile Include="Util\IStartable.cs" />
+    <Compile Include="Util\ITaskService.cs" />
+    <Compile Include="Util\LimitedConcurrencyLevelTaskScheduler.cs" />
+    <Compile Include="Util\NetworkUtils.cs" />
+    <Compile Include="Util\SerializationHelper.cs" />
+    <Compile Include="Util\TaskExtensions.cs" />
+    <Compile Include="Util\TimeHelper.cs" />
+    <Compile Include="WakeRuntimeException.cs" />
+  </ItemGroup>
+  <ItemGroup>
+    <None Include="packages.config" />
+    <None Include="Protobuf\RemoteProtocol.proto" />
+  </ItemGroup>
+  <ItemGroup>
+    <Folder Include="Time\Time\Event\" />
+    <Folder Include="Time\Time\Runtime\Event\" />
+    <Folder Include="Util\Util\" />
+  </ItemGroup>
+  <ItemGroup>
+    <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Tang\Org.Apache.REEF.Tang.csproj">
+      <Project>{97dbb573-3994-417a-9f69-ffa25f00d2a6}</Project>
+      <Name>Org.Apache.REEF.Tang</Name>
+    </ProjectReference>
+    <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Utilities\Org.Apache.REEF.Utilities.csproj">
+      <Project>{79e7f89a-1dfb-45e1-8d43-d71a954aeb98}</Project>
+      <Name>Org.Apache.REEF.Utilities</Name>
+    </ProjectReference>
+  </ItemGroup>
+  <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
+  <Import Project="$(SolutionDir)\.nuget\NuGet.targets" Condition="Exists('$(SolutionDir)\.nuget\NuGet.targets')" />
+  <!-- To modify your build process, add your task inside one of the targets below and uncomment it. 
+       Other similar extension points exist, see Microsoft.Common.targets.
+  <Target Name="BeforeBuild">
+  </Target>
+  <Target Name="AfterBuild">
+  </Target>
+  -->
+</Project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Properties/AssemblyInfo.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Properties/AssemblyInfo.cs b/lang/cs/Org.Apache.REEF.Wake/Properties/AssemblyInfo.cs
new file mode 100644
index 0000000..48a4764
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Properties/AssemblyInfo.cs
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Reflection;
+using System.Runtime.CompilerServices;
+using System.Runtime.InteropServices;
+
+// General Information about an assembly is controlled through the following 
+// set of attributes. Change these attribute values to modify the information
+// associated with an assembly.
+[assembly: AssemblyTitle("Org.Apache.REEF.Wake")]
+[assembly: AssemblyDescription("")]
+[assembly: AssemblyConfiguration("")]
+[assembly: AssemblyCompany("")]
+[assembly: AssemblyProduct("Org.Apache.REEF.Wake")]
+[assembly: AssemblyCopyright("Copyright ©  2015")]
+[assembly: AssemblyTrademark("")]
+[assembly: AssemblyCulture("")]
+
+// Setting ComVisible to false makes the types in this assembly not visible 
+// to COM components.  If you need to access a type in this assembly from 
+// COM, set the ComVisible attribute to true on that type.
+[assembly: ComVisible(false)]
+
+// The following GUID is for the ID of the typelib if this project is exposed to COM
+[assembly: Guid("86a66ac8-0c8e-4652-b533-670e800cb0ea")]
+
+// Version information for an assembly consists of the following four values:
+//
+//      Major Version
+//      Minor Version 
+//      Build Number
+//      Revision
+//
+// You can specify all the values or you can default the Build and Revision Numbers 
+// by using the '*' as shown below:
+// [assembly: AssemblyVersion("1.0.*")]
+[assembly: AssemblyVersion("1.0.0.0")]
+[assembly: AssemblyFileVersion("1.0.0.0")]

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Protobuf/RemoteProtocol.proto
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Protobuf/RemoteProtocol.proto b/lang/cs/Org.Apache.REEF.Wake/Protobuf/RemoteProtocol.proto
new file mode 100644
index 0000000..cd28d13
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Protobuf/RemoteProtocol.proto
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+option java_package = "org.apache.reef.wake.remote.proto";
+option java_outer_classname = "WakeRemoteProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+message WakeMessagePBuf {
+  required bytes data = 1;
+  required int64 seq = 2; 
+  optional string source = 3; 
+  optional string sink = 4; 
+}
+
+message WakeTuplePBuf {
+  required string className = 1;
+  required bytes data = 2; 
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Protobuf/WakeRemoteProtosGen.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Protobuf/WakeRemoteProtosGen.cs b/lang/cs/Org.Apache.REEF.Wake/Protobuf/WakeRemoteProtosGen.cs
new file mode 100644
index 0000000..f3c59f8
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Protobuf/WakeRemoteProtosGen.cs
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+//------------------------------------------------------------------------------
+// <auto-generated>
+//     This code was generated by a tool.
+//
+//     Changes to this file may cause incorrect behavior and will be lost if
+//     the code is regenerated.
+// </auto-generated>
+//------------------------------------------------------------------------------
+
+// Generated from: src/main/proto/RemoteProtocol.proto
+namespace Org.Apache.REEF.Wake.Remote.Proto.WakeRemoteProtos
+{
+  [global::System.Serializable, global::ProtoBuf.ProtoContract(Name=@"WakeMessagePBuf")]
+  public partial class WakeMessagePBuf : global::ProtoBuf.IExtensible
+  {
+    public WakeMessagePBuf() {}
+    
+    private byte[] _data;
+    [global::ProtoBuf.ProtoMember(1, IsRequired = true, Name=@"data", DataFormat = global::ProtoBuf.DataFormat.Default)]
+    public byte[] data
+    {
+      get { return _data; }
+      set { _data = value; }
+    }
+    private long _seq;
+    [global::ProtoBuf.ProtoMember(2, IsRequired = true, Name=@"seq", DataFormat = global::ProtoBuf.DataFormat.TwosComplement)]
+    public long seq
+    {
+      get { return _seq; }
+      set { _seq = value; }
+    }
+    private string _source = "";
+    [global::ProtoBuf.ProtoMember(3, IsRequired = false, Name=@"source", DataFormat = global::ProtoBuf.DataFormat.Default)]
+    [global::System.ComponentModel.DefaultValue("")]
+    public string source
+    {
+      get { return _source; }
+      set { _source = value; }
+    }
+    private string _sink = "";
+    [global::ProtoBuf.ProtoMember(4, IsRequired = false, Name=@"sink", DataFormat = global::ProtoBuf.DataFormat.Default)]
+    [global::System.ComponentModel.DefaultValue("")]
+    public string sink
+    {
+      get { return _sink; }
+      set { _sink = value; }
+    }
+    private global::ProtoBuf.IExtension extensionObject;
+    global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+      { return global::ProtoBuf.Extensible.GetExtensionObject(ref extensionObject, createIfMissing); }
+  }
+  
+  [global::System.Serializable, global::ProtoBuf.ProtoContract(Name=@"WakeTuplePBuf")]
+  public partial class WakeTuplePBuf : global::ProtoBuf.IExtensible
+  {
+    public WakeTuplePBuf() {}
+    
+    private string _className;
+    [global::ProtoBuf.ProtoMember(1, IsRequired = true, Name=@"className", DataFormat = global::ProtoBuf.DataFormat.Default)]
+    public string className
+    {
+      get { return _className; }
+      set { _className = value; }
+    }
+    private byte[] _data;
+    [global::ProtoBuf.ProtoMember(2, IsRequired = true, Name=@"data", DataFormat = global::ProtoBuf.DataFormat.Default)]
+    public byte[] data
+    {
+      get { return _data; }
+      set { _data = value; }
+    }
+    private global::ProtoBuf.IExtension extensionObject;
+    global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+      { return global::ProtoBuf.Extensible.GetExtensionObject(ref extensionObject, createIfMissing); }
+  }
+  
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/RX/AbstractObserver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/RX/AbstractObserver.cs b/lang/cs/Org.Apache.REEF.Wake/RX/AbstractObserver.cs
new file mode 100644
index 0000000..3c3451b
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/RX/AbstractObserver.cs
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Utilities.Logging;
+using System;
+
+namespace Org.Apache.REEF.Wake.RX
+{
+    /// <summary>
+    /// An observer with logging-only onError and onCompleted() methods.
+    /// </summary>
+    /// <typeparam name="T">The observer type</typeparam>
+    public abstract class AbstractObserver<T> : IObserver<T>
+    {
+        private static readonly Logger LOGGER = Logger.GetLogger(typeof(AbstractObserver<T>));
+
+        public virtual void OnError(Exception error)
+        {
+            LOGGER.Log(Level.Info, "The observer " + GetType() + "has received an Exception: " + error);
+        }
+
+        public virtual void OnCompleted()
+        {
+            LOGGER.Log(Level.Verbose, "The observer " + GetType() + "has received an onCompleted() ");
+        }
+
+        public abstract void OnNext(T arg1);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/RX/AbstractRxStage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/RX/AbstractRxStage.cs b/lang/cs/Org.Apache.REEF.Wake/RX/AbstractRxStage.cs
new file mode 100644
index 0000000..5238935
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/RX/AbstractRxStage.cs
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+
+namespace Org.Apache.REEF.Wake.RX
+{
+    /// <summary>
+    /// An Rx stage that implements metering
+    /// </summary>
+    public abstract class AbstractRxStage<T> : IRxStage<T>
+    {
+        //protected internal readonly Meter meter;
+
+        /// <summary>Constructs an abstact rxstage</summary>
+        /// <param name="meterName">the name of the meter</param>
+        public AbstractRxStage(string meterName)
+        {
+            //meter = new Meter(meterName);
+        }
+
+        /// <summary>Updates the meter</summary>
+        /// <param name="value">the event</param>
+        public virtual void OnNext(T value)
+        {
+            //meter.Mark(1);
+        }
+
+        public abstract void OnCompleted();
+
+        public abstract void OnError(Exception error);
+
+        public virtual void Dispose()
+        {
+            // no op
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/RX/IRxStage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/RX/IRxStage.cs b/lang/cs/Org.Apache.REEF.Wake/RX/IRxStage.cs
new file mode 100644
index 0000000..bc93a0e
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/RX/IRxStage.cs
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+
+namespace Org.Apache.REEF.Wake.RX
+{
+    /// <summary>Stage that executes the observer</summary>
+    public interface IRxStage<T> : IObserver<T>, IStage
+    {
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/RX/IStaticObservable.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/RX/IStaticObservable.cs b/lang/cs/Org.Apache.REEF.Wake/RX/IStaticObservable.cs
new file mode 100644
index 0000000..035953c
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/RX/IStaticObservable.cs
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+
+namespace Org.Apache.REEF.Wake.RX
+{
+    public interface IStaticObservable
+    {
+        //intentionally empty
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/RX/ISubject.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/RX/ISubject.cs b/lang/cs/Org.Apache.REEF.Wake/RX/ISubject.cs
new file mode 100644
index 0000000..9829dfd
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/RX/ISubject.cs
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+
+namespace Org.Apache.REEF.Wake.RX
+{
+    /// <summary>A class implementing Observer> and StaticObservable</summary>
+    /// <typeparam name="In">The in type</typeparam>
+    /// <typeparam name="Out">The out type</typeparam>
+    public interface ISubject<In, Out> : IObserver<In>, IStaticObservable
+    {
+        // intentionally empty
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/RX/Impl/PubSubSubject.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/RX/Impl/PubSubSubject.cs b/lang/cs/Org.Apache.REEF.Wake/RX/Impl/PubSubSubject.cs
new file mode 100644
index 0000000..73962dd
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/RX/Impl/PubSubSubject.cs
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Reactive.Subjects;
+using System.Reflection;
+using System.Text;
+using Org.Apache.REEF.Utilities.Diagnostics;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Wake.RX.Impl
+{
+    /// <summary>
+    /// Subject to provide publish/subscribe interface.
+    /// Subscribes to class Types and invokes handlers for a given
+    /// type on call to OnNext
+    /// </summary>
+    /// <typeparam name="T">The super type that all event types
+    /// inherit from</typeparam>
+    public class PubSubSubject<T> : IObserver<T>
+    {
+        private static readonly Logger LOGGER = Logger.GetLogger(typeof(PubSubSubject<T>));
+
+        private Dictionary<Type, List<object>> _classToObserversMap;
+        private bool _completed;
+        private object _mutex;
+
+        /// <summary>
+        /// Constructs a pub-sub Subject
+        /// </summary>
+        public PubSubSubject()
+        {
+            _classToObserversMap = new Dictionary<Type, List<object>>();
+            _mutex = new object();
+        }
+
+        /// <summary>
+        /// Log on completion
+        /// </summary>
+        public void OnCompleted()
+        {
+            lock (_mutex)
+            {
+                _completed = true;
+            }
+        }
+
+        /// <summary>
+        /// Log Exception
+        /// </summary>
+        /// <param name="error"></param>
+        public void OnError(Exception error)
+        {
+            lock (_mutex)
+            {
+                _completed = true;
+            }
+        }
+
+        /// <summary>
+        /// Invoke the subscribed handlers for the event class type
+        /// </summary>
+        /// <param name="value">The event to process</param>
+        public void OnNext(T value)
+        {
+            if (value == null)
+            {
+                Exceptions.Throw(new ArgumentNullException("value"), LOGGER);
+            }
+
+            lock (_mutex)
+            {
+                // If OnCompleted or OnError called, do nothing
+                if (_completed)
+                {
+                    return;
+                }
+
+                // Check that the event type has been subscribed
+                List<object> handlers;
+                if (!_classToObserversMap.TryGetValue(value.GetType(), out handlers))
+                {
+                    Exceptions.Throw(new ArgumentException("No event for type " + value.GetType()), LOGGER);
+                }
+
+                // Invoke each IObserver for the event type
+                foreach (object handler in handlers)
+                {
+                    Type handlerType = typeof(IObserver<>).MakeGenericType(new[] { value.GetType() });
+                    MethodInfo info = handlerType.GetMethod("OnNext");
+                    info.Invoke(handler, new[] { (object) value });
+                }
+            }
+        }
+
+        /// <summary>
+        /// Subscribe an IObserver for an event type
+        /// </summary>
+        /// <typeparam name="U">The event type</typeparam>
+        /// <param name="observer">The observer to handle the event</param>
+        /// <returns>An IDisposable object used to handle unsubscribing
+        /// the IObserver</returns>
+        public IDisposable Subscribe<U>(IObserver<U> observer) where U : T
+        {
+            lock (_mutex)
+            {
+                List<object> observers;
+                if (!_classToObserversMap.TryGetValue(typeof(U), out observers))
+                {
+                    observers = new List<object>();
+                    _classToObserversMap[typeof(U)] = observers;
+                }
+                observers.Add(observer);
+            }
+
+            return new DisposableResource<U>(_classToObserversMap, observer, _mutex);
+        }
+
+        /// <summary>
+        /// Utility class to handle disposing of an IObserver
+        /// </summary>
+        private class DisposableResource<U> : IDisposable
+        {
+            private Dictionary<Type, List<object>> _observersMap;
+            private IObserver<U> _observer;
+            private object _mutex;
+            private bool _disposed;
+            
+            public DisposableResource(Dictionary<Type, List<object>> observersMap, IObserver<U> observer, object mutex)
+            {
+                _observersMap = observersMap;
+                _observer = observer;
+                _mutex = mutex;
+                _disposed = false;
+            }
+
+            /// <summary>
+            /// Unsubscribe the IObserver from the observer map
+            /// </summary>
+            public void Dispose()
+            {
+                if (!_disposed)
+                {
+                    UnsubscribeObserver();
+                    _disposed = true;
+                }
+            }
+
+            private void UnsubscribeObserver()
+            {
+                lock (_mutex)
+                {
+                    List<object> observers;
+                    if (_observersMap.TryGetValue(typeof(U), out observers))
+                    {
+                        observers.Remove(_observer);
+                    }
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/RX/Impl/RxSyncStage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/RX/Impl/RxSyncStage.cs b/lang/cs/Org.Apache.REEF.Wake/RX/Impl/RxSyncStage.cs
new file mode 100644
index 0000000..e2527a0
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/RX/Impl/RxSyncStage.cs
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ 
+using System;
+
+namespace Org.Apache.REEF.Wake.RX.Impl
+{
+    /// <summary>Stage that executes the observer synchronously</summary>
+    public class RxSyncStage<T> : AbstractRxStage<T>
+    {
+        private readonly IObserver<T> _observer;
+
+        /// <summary>Constructs a Rx synchronous stage</summary>
+        /// <param name="observer">the observer</param>
+        public RxSyncStage(IObserver<T> observer) : base(observer.GetType().FullName)
+        {
+            _observer = observer;
+        }
+
+        /// <summary>Provides the observer with the new value</summary>
+        /// <param name="value">the new value</param>
+        public override void OnNext(T value)
+        {
+            base.OnNext(value);
+            _observer.OnNext(value);
+        }
+
+        /// <summary>
+        /// Notifies the observer that the provider has experienced an error
+        /// condition.
+        /// </summary>
+        /// <param name="error">the error</param>
+        public override void OnError(Exception error)
+        {
+            _observer.OnError(error);
+        }
+
+        /// <summary>
+        /// Notifies the observer that the provider has finished sending push-based
+        /// notifications.
+        /// </summary>
+        public override void OnCompleted()
+        {
+            _observer.OnCompleted();
+        }
+
+        /// <summary>
+        /// Closes the stage
+        /// </summary>
+        public override void Dispose()
+        {
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/RX/Impl/RxThreadPoolStage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/RX/Impl/RxThreadPoolStage.cs b/lang/cs/Org.Apache.REEF.Wake/RX/Impl/RxThreadPoolStage.cs
new file mode 100644
index 0000000..d329430
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/RX/Impl/RxThreadPoolStage.cs
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Utilities.Diagnostics;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Wake.Util;
+using System;
+
+namespace Org.Apache.REEF.Wake.RX.Impl
+{
+    /// <summary>Stage that executes the observer with a thread pool</summary>
+    public class RxThreadPoolStage<T> : AbstractRxStage<T>
+    {
+        private static readonly Logger LOGGER = Logger.GetLogger(typeof(RxThreadPoolStage<T>));
+
+        private readonly IObserver<T> _observer;
+
+        private readonly ITaskService _taskService;
+
+        /// <summary>Constructs a Rx thread pool stage</summary>
+        /// <param name="observer">the observer to execute</param>
+        /// <param name="numThreads">the number of threads</param>
+        public RxThreadPoolStage(IObserver<T> observer, int numThreads) 
+            : base(observer.GetType().FullName)
+        {
+            _observer = observer;
+            if (numThreads <= 0)
+            {
+                Exceptions.Throw(new WakeRuntimeException("numThreads " + numThreads + " is less than or equal to 0"), LOGGER);
+            }
+            _taskService = new FixedThreadPoolTaskService(numThreads);
+        }
+
+        /// <summary>Provides the observer with the new value</summary>
+        /// <param name="value">the new value</param>
+        public override void OnNext(T value)
+        {
+            base.OnNext(value);
+            _taskService.Execute(new _Startable_58(this, value).Start);
+        }
+
+        /// <summary>
+        /// Notifies the observer that the provider has experienced an error
+        /// condition.
+        /// </summary>
+        /// <param name="error">the error</param>
+        public override void OnError(Exception error)
+        {
+            _taskService.Execute(new _Startable_75(this, error).Start);
+        }
+
+        /// <summary>
+        /// Notifies the observer that the provider has finished sending push-based
+        /// notifications.
+        /// </summary>
+        public override void OnCompleted()
+        {
+            _taskService.Execute(new _Startable_91(this).Start);
+        }
+
+        /// <summary>
+        /// Closes the stage
+        /// </summary>
+        public override void Dispose()
+        {
+            _taskService.Shutdown();
+        }
+
+        private sealed class _Startable_58 : IStartable
+        {
+            private readonly RxThreadPoolStage<T> _enclosing;
+            private readonly T _value;
+
+            public _Startable_58(RxThreadPoolStage<T> enclosing, T value)
+            {
+                _enclosing = enclosing;
+                _value = value;
+            }
+
+            public void Start()
+            {
+                _enclosing._observer.OnNext(_value);
+            }
+        }
+
+        private sealed class _Startable_75 : IStartable
+        {
+            private readonly RxThreadPoolStage<T> _enclosing;
+            private readonly Exception _error;
+
+            public _Startable_75(RxThreadPoolStage<T> enclosing, Exception error)
+            {
+                _enclosing = enclosing;
+                _error = error;
+            }
+
+            public void Start()
+            {
+                _enclosing._observer.OnError(_error);
+            }
+        }
+
+        private sealed class _Startable_91 : IStartable
+        {
+            private readonly RxThreadPoolStage<T> _enclosing;
+
+            public _Startable_91(RxThreadPoolStage<T> enclosing)
+            {
+                _enclosing = enclosing;
+            }
+
+            public void Start()
+            {
+                _enclosing._observer.OnCompleted();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/RX/Impl/RxTimerStage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/RX/Impl/RxTimerStage.cs b/lang/cs/Org.Apache.REEF.Wake/RX/Impl/RxTimerStage.cs
new file mode 100644
index 0000000..541c2d4
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/RX/Impl/RxTimerStage.cs
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Timers;
+
+using Org.Apache.REEF.Wake.Impl;
+
+namespace Org.Apache.REEF.Wake.RX.Impl
+{
+    /// <summary>Timer stage that provides events to the observer periodically</summary>
+    public class RxTimerStage : IStage, IStaticObservable
+    {
+        private readonly Timer _timer;
+        private readonly PeriodicEvent _value = new PeriodicEvent();
+        private readonly IObserver<PeriodicEvent> _observer;
+
+        /// <summary>Constructs a Rx timer stage</summary>
+        /// <param name="observer">the observer</param>
+        /// <param name="period">the period in milli-seconds</param>
+        public RxTimerStage(IObserver<PeriodicEvent> observer, long period) 
+            : this(observer, 0, period)
+        {
+        }
+
+        /// <summary>Constructs a Rx timer stage</summary>
+        /// <param name="observer">the observer</param>
+        /// <param name="initialDelay">the initial delay in milli-seconds</param>
+        /// <param name="period">the period in milli-seconds</param>
+        public RxTimerStage(IObserver<PeriodicEvent> observer, long initialDelay, long period)
+        {
+            _observer = observer;
+            _timer = new Timer(period);
+            _timer.Elapsed += (sender, e) => OnTimedEvent(sender, e, _observer, _value);
+            _timer.Enabled = true;
+        }
+
+        /// <summary>
+        /// Closes the stage
+        /// </summary>
+        public void Dispose()
+        {
+            _timer.Stop();
+        }
+
+        private static void OnTimedEvent(object source, ElapsedEventArgs e, IObserver<PeriodicEvent> observer, PeriodicEvent value)
+        {
+            observer.OnNext(value);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/RX/Impl/SimpleSubject.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/RX/Impl/SimpleSubject.cs b/lang/cs/Org.Apache.REEF.Wake/RX/Impl/SimpleSubject.cs
new file mode 100644
index 0000000..7d4aaa8
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/RX/Impl/SimpleSubject.cs
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ 
+using System;
+
+namespace Org.Apache.REEF.Wake.RX.Impl
+{
+    /// <summary>A Subject that relays all messages to its subscribers.</summary>
+    public class SimpleSubject<T> : ISubject<T, T>
+    {
+        private readonly IObserver<T> _observer;
+
+        /// <summary>Constructs a simple subject</summary>
+        /// <param name="observer">the observer</param>
+        public SimpleSubject(IObserver<T> observer)
+        {
+            _observer = observer;
+        }
+
+        /// <summary>Provides the observer with the new value</summary>
+        /// <param name="value">the new value</param>
+        public virtual void OnNext(T value)
+        {
+            _observer.OnNext(value);
+        }
+
+        /// <summary>Provides the observer with the error</summary>
+        /// <param name="error">the error</param>
+        public virtual void OnError(Exception error)
+        {
+            _observer.OnError(error);
+        }
+
+        /// <summary>
+        /// Provides the observer with it has finished sending push-based
+        /// notifications.
+        /// </summary>
+        public virtual void OnCompleted()
+        {
+            _observer.OnCompleted();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/RX/ObserverCompletedException.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/RX/ObserverCompletedException.cs b/lang/cs/Org.Apache.REEF.Wake/RX/ObserverCompletedException.cs
new file mode 100644
index 0000000..e620a7c
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/RX/ObserverCompletedException.cs
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+
+namespace Org.Apache.REEF.Wake.RX
+{
+    /// <summary>
+    /// It is illegal to call onError() or onCompleted() when a call to onNext() is
+    /// still outstanding, or to call onNext(), onError() or onCompleted() after a
+    /// call to onError() or onCompleted() has been dispatched.
+    /// </summary>
+    /// <remarks>
+    /// It is illegal to call onError() or onCompleted() when a call to onNext() is
+    /// still outstanding, or to call onNext(), onError() or onCompleted() after a
+    /// call to onError() or onCompleted() has been dispatched. Observers may throw
+    /// an ObserverCompleted exception whenever this API is violated. Violating the
+    /// API leaves the Observer (and any resources that it holds) in an undefined
+    /// state, and throwing ObserverCompleted exceptions is optional.
+    /// Callers receiving this exception should simply pass it up the stack to the
+    /// Aura runtime. They should not attempt to forward it on to upstream or
+    /// downstream stages. The easiest way to do this is to ignore the exception
+    /// entirely.
+    /// </remarks>
+    [System.Serializable]
+    public class ObserverCompletedException : InvalidOperationException
+    {
+        private const long serialVersionUID = 1L;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/ICodec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/ICodec.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/ICodec.cs
new file mode 100644
index 0000000..2591d1e
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/ICodec.cs
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.REEF.Wake.Remote
+{
+    public interface ICodec
+    {
+    }
+
+    /// <summary>
+    /// Interface for serialization routines that translate back and forth between
+    /// byte arrays with low latency.
+    /// </summary>
+    /// <typeparam name="T">The codec type</typeparam>
+    public interface ICodec<T> : ICodec, IEncoder<T>, IDecoder<T>
+    {
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/ICodecFactory.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/ICodecFactory.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/ICodecFactory.cs
new file mode 100644
index 0000000..68b656a
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/ICodecFactory.cs
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Wake.Remote.Impl;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Org.Apache.REEF.Wake.Remote
+{
+    [DefaultImplementation(typeof(ByteCodecFactory))]
+    public interface ICodecFactory
+    {
+        object Create(); 
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/IDecoder.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/IDecoder.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/IDecoder.cs
new file mode 100644
index 0000000..07dd28a
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/IDecoder.cs
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.REEF.Wake.Remote
+{
+    public interface IDecoder
+    {
+    }
+
+    /// <summary>
+    /// Interface for serialization routines that translate back and forth between
+    /// byte arrays with low latency.
+    /// </summary>
+    /// <typeparam name="T">The decoder type</typeparam>
+    public interface IDecoder<T> : IDecoder
+    {
+        /// <summary>Decodes the given byte array into an object</summary>
+        /// <param name="data"></param>
+        /// <returns>the decoded object</returns>
+        T Decode(byte[] data);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/IEncoder.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/IEncoder.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/IEncoder.cs
new file mode 100644
index 0000000..b399131
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/IEncoder.cs
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.REEF.Wake.Remote
+{
+    public interface IEncoder
+    {
+    }
+
+    /// <summary>
+    /// Interface for serialization routines that translate back and forth between
+    /// byte arrays with low latency.
+    /// </summary>
+    /// <typeparam name="T">The encoder type</typeparam>
+    public interface IEncoder<T> : IEncoder
+    {
+        /// <summary>Encodes the given object into a Byte Array</summary>
+        /// <param name="obj"></param>
+        /// <returns>a byte[] representation of the object</returns>
+        byte[] Encode(T obj);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/ILink.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/ILink.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/ILink.cs
new file mode 100644
index 0000000..c0ce1a2
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/ILink.cs
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Net;
+using System.Net.Sockets;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Wake.Remote.Impl;
+
+namespace Org.Apache.REEF.Wake.Remote
+{
+    /// <summary>
+    /// Represents a link between two endpoints
+    /// </summary>
+    public interface ILink<T> : IDisposable
+    {
+        /// <summary>
+        /// Returns the local socket address
+        /// </summary>
+        IPEndPoint LocalEndpoint { get; }
+
+        /// <summary>
+        /// Returns the remote socket address
+        /// </summary>
+        IPEndPoint RemoteEndpoint { get; }
+
+        /// <summary>
+        /// Writes the value to this link asynchronously
+        /// </summary>
+        /// <param name="value">The data to write</param>
+        /// <param name="token">The cancellation token</param>
+        Task WriteAsync(T value, CancellationToken token);
+
+        /// <summary>
+        /// Writes the value to this link synchronously
+        /// </summary>
+        /// <param name="value">The data to write</param>
+        void Write(T value);
+
+        /// <summary>
+        /// Reads the value from this link asynchronously
+        /// </summary>
+        /// <returns>The read data</returns>
+        /// <param name="token">The cancellation token</param>
+        Task<T> ReadAsync(CancellationToken token);
+
+        /// <summary>
+        /// Reads the value from this link synchronously
+        /// </summary>
+        /// <returns>The read data</returns>
+        T Read();
+    }
+}