You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/02/05 22:05:49 UTC
[28/51] [partial] incubator-reef git commit: [REEF-131] Towards the
new .Net project structure This is to change .Net project structure for Tang,
Wake, REEF utilities, Common and Driver:
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/IStage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/IStage.cs b/lang/cs/Org.Apache.REEF.Wake/IStage.cs
new file mode 100644
index 0000000..d7121a8
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/IStage.cs
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+
+namespace Org.Apache.REEF.Wake
+{
+ /// <summary>Stage is an execution unit for events and provides a way to reclaim its resources
+ /// </summary>
+ public interface IStage : IDisposable
+ {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Impl/LoggingEventHandler.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Impl/LoggingEventHandler.cs b/lang/cs/Org.Apache.REEF.Wake/Impl/LoggingEventHandler.cs
new file mode 100644
index 0000000..cd2bff5
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Impl/LoggingEventHandler.cs
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Tang.Annotations;
+using System;
+
+namespace Org.Apache.REEF.Wake.Impl
+{
+ /// <summary>A logging event handler</summary>
+ public class LoggingEventHandler<T> : IObserver<T>
+ {
+ private static readonly Logger LOGGER = Logger.GetLogger(typeof(T));
+
+ [Inject]
+ public LoggingEventHandler()
+ {
+ }
+
+ /// <summary>Logs the event</summary>
+ /// <param name="value">an event</param>
+ public void OnNext(T value)
+ {
+ LOGGER.Log(Level.Verbose, "Event: " + DateTime.Now + value);
+ }
+
+ public void OnError(Exception error)
+ {
+ throw new NotImplementedException();
+ }
+
+ public void OnCompleted()
+ {
+ throw new NotImplementedException();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Impl/MissingStartHandlerHandler.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Impl/MissingStartHandlerHandler.cs b/lang/cs/Org.Apache.REEF.Wake/Impl/MissingStartHandlerHandler.cs
new file mode 100644
index 0000000..324eb61
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Impl/MissingStartHandlerHandler.cs
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Wake.Time;
+
+namespace Org.Apache.REEF.Wake.Impl
+{
+ public class MissingStartHandlerHandler : IObserver<StartTime>
+ {
+ [Inject]
+ public MissingStartHandlerHandler()
+ {
+ }
+
+ public void OnNext(StartTime value)
+ {
+ // Do nothing, since we only use this for evaluator, not for driver.
+ // LOGGER.Log(Level.Info, "No binding to Clock.StartHandler. It is likely that the clock will immediately go idle and close.");
+ }
+
+ public void OnError(Exception error)
+ {
+ throw new NotImplementedException();
+ }
+
+ public void OnCompleted()
+ {
+ throw new NotImplementedException();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Impl/MultiEventHandler.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Impl/MultiEventHandler.cs b/lang/cs/Org.Apache.REEF.Wake/Impl/MultiEventHandler.cs
new file mode 100644
index 0000000..b157c75
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Impl/MultiEventHandler.cs
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Wake.Impl
+{
+ /// <summary>Event handler that dispatches an event to a specific handler based on an event class type
+ /// </summary>
+ public class MultiEventHandler<T> : IEventHandler<T>
+ {
+ private static readonly Logger LOGGER = Logger.GetLogger(typeof(MultiEventHandler<T>));
+ private readonly IDictionary<Type, IEventHandler<T>> _map;
+
+ /// <summary>Constructs a multi-event handler</summary>
+ /// <param name="map">a map of class types to event handlers</param>
+ public MultiEventHandler(IDictionary<Type, IEventHandler<T>> map)
+ {
+ foreach (Type item in map.Keys)
+ {
+ if (!typeof(T).IsAssignableFrom(item))
+ {
+ Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(new WakeRuntimeException(typeof(T) + " is not assignable from " + item), LOGGER);
+ }
+ }
+ _map = map;
+ }
+
+ /// <summary>
+ /// Invokes a specific handler for the event class type if it exists
+ /// </summary>
+ /// <param name="value">The event to handle</param>
+ public void OnNext(T value)
+ {
+ IEventHandler<T> handler = null;
+ bool success = _map.TryGetValue(value.GetType(), out handler);
+ if (success)
+ {
+ handler.OnNext(value);
+ }
+ else
+ {
+ Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(new WakeRuntimeException("No event " + value.GetType() + " handler"), LOGGER);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Impl/PeriodicEvent.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Impl/PeriodicEvent.cs b/lang/cs/Org.Apache.REEF.Wake/Impl/PeriodicEvent.cs
new file mode 100644
index 0000000..543d342
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Impl/PeriodicEvent.cs
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.REEF.Wake.Impl
+{
+ /// <summary>Periodic event for timers</summary>
+ public class PeriodicEvent
+ {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Impl/PubSubEventHandler.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Impl/PubSubEventHandler.cs b/lang/cs/Org.Apache.REEF.Wake/Impl/PubSubEventHandler.cs
new file mode 100644
index 0000000..38a36be
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Impl/PubSubEventHandler.cs
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Reflection;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Wake.Impl
+{
+ /// <summary>
+ /// Event handler to provide publish/subscribe interfaces
+ /// </summary>
+ /// <typeparam name="T">The type of event handler</typeparam>
+ public class PubSubEventHandler<T> : IEventHandler<T>
+ {
+ private static readonly Logger LOGGER = Logger.GetLogger(typeof(PubSubEventHandler<T>));
+
+ private Dictionary<Type, List<object>> _classToHandlersMap;
+
+ /// <summary>
+ /// Construct a pub-sub event handler
+ /// </summary>
+ public PubSubEventHandler()
+ {
+ _classToHandlersMap = new Dictionary<Type, List<object>>();
+ }
+
+ /// <summary>
+ /// Subscribe an event handler for an event type
+ /// </summary>
+ /// <typeparam name="U">The type of event handler</typeparam>
+ /// <param name="handler">The event handler</param>
+ public void Subscribe<U>(IEventHandler<U> handler) where U : T
+ {
+ lock (_classToHandlersMap)
+ {
+ List<object> handlers;
+ if (!_classToHandlersMap.TryGetValue(typeof(U), out handlers))
+ {
+ handlers = new List<object>();
+ _classToHandlersMap[typeof(U)] = handlers;
+ }
+ handlers.Add(handler);
+ }
+ }
+
+ /// <summary>
+ /// Invoke the subscribed handlers for the event class type
+ /// </summary>
+ /// <param name="value">The event to process</param>
+ public void OnNext(T value)
+ {
+ if (value == null)
+ {
+ Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(new ArgumentNullException("value"), LOGGER);
+ }
+
+ lock (_classToHandlersMap)
+ {
+ // Check that the event type has been subscribed
+ List<object> handlers;
+ if (!_classToHandlersMap.TryGetValue(value.GetType(), out handlers))
+ {
+ Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(new ArgumentException("No event for type " + value.GetType()), LOGGER);
+ }
+
+ // Invoke each handler for the event type
+ foreach (object handler in handlers)
+ {
+ Type handlerType = typeof(IEventHandler<>).MakeGenericType(new[] { value.GetType() });
+ MethodInfo info = handlerType.GetMethod("OnNext");
+ info.Invoke(handler, new[] { (object)value });
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Impl/SingleThreadStage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Impl/SingleThreadStage.cs b/lang/cs/Org.Apache.REEF.Wake/Impl/SingleThreadStage.cs
new file mode 100644
index 0000000..e27f67e
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Impl/SingleThreadStage.cs
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Concurrent;
+using System.Threading;
+
+namespace Org.Apache.REEF.Wake.Impl
+{
+ /// <summary>Single thread stage that runs the event handler</summary>
+ public class SingleThreadStage<T> : AbstractEStage<T>
+ {
+ private readonly BlockingCollection<T> queue;
+
+ private readonly Thread thread;
+
+ private bool interrupted;
+
+ public SingleThreadStage(IEventHandler<T> handler, int capacity) : base(handler.GetType().FullName)
+ {
+ queue = new BlockingCollection<T>(capacity);
+ interrupted = false;
+ thread = new Thread(new ThreadStart(new Producer<T>(queue, handler, interrupted).Run));
+ thread.Start();
+ }
+
+ /// <summary>
+ /// Puts the value to the queue, which will be processed by the handler later
+ /// if the queue is full, IllegalStateException is thrown
+ /// </summary>
+ /// <param name="value">the value</param>
+ public override void OnNext(T value)
+ {
+ base.OnNext(value);
+ queue.Add(value);
+ }
+
+ /// <summary>
+ /// Closes the stage
+ /// </summary>
+ public override void Dispose()
+ {
+ interrupted = true;
+ thread.Interrupt();
+ }
+ }
+
+ /// <summary>Takes events from the queue and provides them to the handler</summary>
+ /// <typeparam name="T">The type</typeparam>
+ internal class Producer<T>
+ {
+ private readonly BlockingCollection<T> _queue;
+
+ private readonly IEventHandler<T> _handler;
+
+ private volatile bool _interrupted;
+
+ internal Producer(BlockingCollection<T> queue, IEventHandler<T> handler, bool interrupted)
+ {
+ _queue = queue;
+ _handler = handler;
+ _interrupted = interrupted;
+ }
+
+ public void Run()
+ {
+ while (true)
+ {
+ try
+ {
+ T value = _queue.Take();
+ _handler.OnNext(value);
+ }
+ catch (Exception)
+ {
+ if (_interrupted)
+ {
+ break;
+ }
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Impl/SyncStage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Impl/SyncStage.cs b/lang/cs/Org.Apache.REEF.Wake/Impl/SyncStage.cs
new file mode 100644
index 0000000..c6a7b22
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Impl/SyncStage.cs
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Wake;
+
+namespace Org.Apache.REEF.Wake.Impl
+{
+ /// <summary>Stage that synchronously executes an event handler</summary>
+ public class SyncStage<T> : AbstractEStage<T>
+ {
+ private readonly IEventHandler<T> _handler;
+
+ /// <summary>Constructs a synchronous stage</summary>
+ /// <param name="handler">an event handler</param>
+ public SyncStage(IEventHandler<T> handler) : base(handler.GetType().FullName)
+ {
+ _handler = handler;
+ }
+
+ /// <summary>Invokes the handler for the event</summary>
+ /// <param name="value">an event</param>
+ public override void OnNext(T value)
+ {
+ base.OnNext(value);
+ _handler.OnNext(value);
+ }
+
+ public override void Dispose()
+ {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Impl/ThreadPoolStage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Impl/ThreadPoolStage.cs b/lang/cs/Org.Apache.REEF.Wake/Impl/ThreadPoolStage.cs
new file mode 100644
index 0000000..f1bb67d
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Impl/ThreadPoolStage.cs
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Threading;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Wake;
+using Org.Apache.REEF.Wake.Util;
+
+namespace Org.Apache.REEF.Wake.Impl
+{
+ /// <summary>Stage that executes an event handler with a thread pool</summary>
+ public class ThreadPoolStage<T> : AbstractEStage<T>
+ {
+ private static readonly Logger LOGGER = Logger.GetLogger(typeof(ThreadPoolStage<T>));
+
+ private readonly IEventHandler<T> _handler;
+
+ private readonly ITaskService _taskService;
+
+ private readonly int _numThreads;
+
+ /// <summary>Constructs a thread-pool stage</summary>
+ /// <param name="handler">An event handler to execute</param>
+ /// <param name="numThreads">The number of threads to use</param>
+ public ThreadPoolStage(IEventHandler<T> handler, int numThreads)
+ : base(handler.GetType().FullName)
+ {
+ _handler = handler;
+ if (numThreads <= 0)
+ {
+ Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(new WakeRuntimeException("numThreads " + numThreads + " is less than or equal to 0"), LOGGER);
+ }
+ _numThreads = numThreads;
+ _taskService = new FixedThreadPoolTaskService(numThreads);
+ }
+
+ /// <summary>Constructs a thread-pool stage</summary>
+ /// <param name="handler">an event handler to execute</param>
+ /// <param name="taskService">an external executor service provided</param>
+ public ThreadPoolStage(IEventHandler<T> handler, ITaskService taskService) : base(
+ handler.GetType().FullName)
+ {
+ _handler = handler;
+ _numThreads = 0;
+ _taskService = taskService;
+ }
+
+ /// <summary>Handles the event using a thread in the thread pool</summary>
+ /// <param name="value">an event</param>
+ public override void OnNext(T value)
+ {
+ base.OnNext(value);
+ _taskService.Execute(new _Startable_74(this, value).Start);
+ }
+
+ /// <summary>
+ /// Closes resources
+ /// </summary>
+ public override void Dispose()
+ {
+ if (_numThreads > 0)
+ {
+ _taskService.Shutdown();
+ }
+ }
+
+ private sealed class _Startable_74 : IStartable
+ {
+ private readonly ThreadPoolStage<T> _enclosing;
+ private readonly T _value;
+
+ public _Startable_74(ThreadPoolStage<T> enclosing, T value)
+ {
+ _enclosing = enclosing;
+ _value = value;
+ }
+
+ public void Start()
+ {
+ _enclosing._handler.OnNext(_value);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Impl/TimerStage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Impl/TimerStage.cs b/lang/cs/Org.Apache.REEF.Wake/Impl/TimerStage.cs
new file mode 100644
index 0000000..f2f2f35
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Impl/TimerStage.cs
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Timers;
+
+using Org.Apache.REEF.Wake;
+
+namespace Org.Apache.REEF.Wake.Impl
+{
+ /// <summary>Stage that triggers an event handler periodically</summary>
+ public class TimerStage : IStage
+ {
+ //private readonly ScheduledExecutorService executor;
+ private readonly Timer _timer;
+ private readonly PeriodicEvent _value = new PeriodicEvent();
+ private readonly IEventHandler<PeriodicEvent> _handler;
+
+ /// <summary>Constructs a timer stage with no initial delay</summary>
+ /// <param name="handler">an event handler</param>
+ /// <param name="period">a period in milli-seconds</param>
+ public TimerStage(IEventHandler<PeriodicEvent> handler, long period) : this(handler, 0, period)
+ {
+ }
+
+ /// <summary>Constructs a timer stage</summary>
+ /// <param name="handler">an event handler</param>
+ /// <param name="initialDelay">an initial delay</param>
+ /// <param name="period">a period in milli-seconds</param>
+ public TimerStage(IEventHandler<PeriodicEvent> handler, long initialDelay, long period)
+ {
+ _handler = handler;
+ _timer = new Timer(period);
+ _timer.Elapsed += (sender, e) => OnTimedEvent(sender, e, _handler, _value);
+ _timer.Enabled = true;
+ }
+
+ /// <summary>
+ /// Closes resources
+ /// </summary>
+ public void Dispose()
+ {
+ _timer.Stop();
+ }
+
+ private static void OnTimedEvent(object source, ElapsedEventArgs e, IEventHandler<PeriodicEvent> handler, PeriodicEvent value)
+ {
+ handler.OnNext(value);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj b/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
new file mode 100644
index 0000000..f383707
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
@@ -0,0 +1,216 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<Project ToolsVersion="12.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <Import Project="$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props" Condition="Exists('$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props')" />
+ <PropertyGroup>
+ <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
+ <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
+ <ProjectGuid>{CDFB3464-4041-42B1-9271-83AF24CD5008}</ProjectGuid>
+ <OutputType>Library</OutputType>
+ <AppDesignerFolder>Properties</AppDesignerFolder>
+ <RootNamespace>Org.Apache.REEF.Wake</RootNamespace>
+ <AssemblyName>Org.Apache.REEF.Wake</AssemblyName>
+ <TargetFrameworkVersion>v4.5</TargetFrameworkVersion>
+ <FileAlignment>512</FileAlignment>
+ <RestorePackages>true</RestorePackages>
+ <SolutionDir Condition="$(SolutionDir) == '' Or $(SolutionDir) == '*Undefined*'">..\</SolutionDir>
+ </PropertyGroup>
+ <Import Project="$(SolutionDir)\Source\build.props" />
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
+ <PlatformTarget>AnyCPU</PlatformTarget>
+ <DebugSymbols>true</DebugSymbols>
+ <DebugType>full</DebugType>
+ <Optimize>false</Optimize>
+ <OutputPath>$(BinDir)\$(Platform)\$(Configuration)\$(RootNamespace)</OutputPath>
+ <DefineConstants>DEBUG;TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|x64' ">
+ <PlatformTarget>AnyCPU</PlatformTarget>
+ <DebugSymbols>true</DebugSymbols>
+ <DebugType>full</DebugType>
+ <Optimize>false</Optimize>
+ <OutputPath>$(BinDir)\$(Platform)\$(Configuration)\$(RootNamespace)</OutputPath>
+ <DefineConstants>DEBUG;TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
+ <PlatformTarget>AnyCPU</PlatformTarget>
+ <DebugType>pdbonly</DebugType>
+ <Optimize>true</Optimize>
+ <OutputPath>$(BinDir)\$(Platform)\$(Configuration)\$(RootNamespace)</OutputPath>
+ <DefineConstants>TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|x64' ">
+ <PlatformTarget>AnyCPU</PlatformTarget>
+ <DebugType>pdbonly</DebugType>
+ <Optimize>true</Optimize>
+ <OutputPath>$(BinDir)\$(Platform)\$(Configuration)\$(RootNamespace)</OutputPath>
+ <DefineConstants>TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <ItemGroup>
+ <Reference Include="protobuf-net">
+ <HintPath>$(PackagesDir)\protobuf-net.$(ProtobufVersion)\lib\net40\protobuf-net.dll</HintPath>
+ </Reference>
+ <Reference Include="System" />
+ <Reference Include="System.Core" />
+ <Reference Include="System.Reactive.Core">
+ <HintPath>$(PackagesDir)\Rx-Core.$(RxVersion)\lib\net45\System.Reactive.Core.dll</HintPath>
+ </Reference>
+ <Reference Include="System.Reactive.Interfaces">
+ <HintPath>$(PackagesDir)\Rx-Interfaces.$(RxVersion)\lib\net45\System.Reactive.Interfaces.dll</HintPath>
+ </Reference>
+ <Reference Include="System.Runtime.Serialization" />
+ <Reference Include="System.Xml.Linq" />
+ <Reference Include="System.Data.DataSetExtensions" />
+ <Reference Include="Microsoft.CSharp" />
+ <Reference Include="System.Data" />
+ <Reference Include="System.Xml" />
+ </ItemGroup>
+ <ItemGroup>
+ <Compile Include="AbstractEStage.cs" />
+ <Compile Include="IEStage.cs" />
+ <Compile Include="IEventHandler.cs" />
+ <Compile Include="IIdentifier.cs" />
+ <Compile Include="IIdentifierFactory.cs" />
+ <Compile Include="Impl\LoggingEventHandler.cs" />
+ <Compile Include="Impl\MissingStartHandlerHandler.cs" />
+ <Compile Include="Impl\MultiEventHandler.cs" />
+ <Compile Include="Impl\PeriodicEvent.cs" />
+ <Compile Include="Impl\PubSubEventHandler.cs" />
+ <Compile Include="Impl\SingleThreadStage.cs" />
+ <Compile Include="Impl\SyncStage.cs" />
+ <Compile Include="Impl\ThreadPoolStage.cs" />
+ <Compile Include="Impl\TimerStage.cs" />
+ <Compile Include="IObserverFactory.cs" />
+ <Compile Include="IStage.cs" />
+ <Compile Include="Properties\AssemblyInfo.cs" />
+ <Compile Include="Remote\ICodec.cs" />
+ <Compile Include="Remote\ICodecFactory.cs" />
+ <Compile Include="Remote\IDecoder.cs" />
+ <Compile Include="Remote\IEncoder.cs" />
+ <Compile Include="Remote\ILink.cs" />
+ <Compile Include="Remote\Impl\ByteCodec.cs" />
+ <Compile Include="Remote\Impl\ByteCodecFactory.cs" />
+ <Compile Include="Remote\Impl\Channel.cs" />
+ <Compile Include="Remote\Impl\DefaultRemoteManager.cs" />
+ <Compile Include="Remote\Impl\DefaultRemoteMessage.cs" />
+ <Compile Include="Remote\Impl\IntCodec.cs" />
+ <Compile Include="Remote\Impl\IPEndpointComparer.cs" />
+ <Compile Include="Remote\Impl\Link.cs" />
+ <Compile Include="Remote\Impl\MultiCodec.cs" />
+ <Compile Include="Remote\Impl\MultiDecoder.cs" />
+ <Compile Include="Remote\Impl\MultiEncoder.cs" />
+ <Compile Include="Remote\Impl\ObserverContainer.cs" />
+ <Compile Include="Remote\Impl\RemoteEvent.cs" />
+ <Compile Include="Remote\Impl\RemoteEventCodec.cs" />
+ <Compile Include="Remote\Impl\RemoteEventDecoder.cs" />
+ <Compile Include="Remote\Impl\RemoteEventEncoder.cs" />
+ <Compile Include="Remote\Impl\RemoteEventEndpoint.cs" />
+ <Compile Include="Remote\Impl\SocketRemoteIdentifier.cs" />
+ <Compile Include="Remote\Impl\StringCodec.cs" />
+ <Compile Include="Remote\Impl\StringIdentifier.cs" />
+ <Compile Include="Remote\Impl\StringIdentifierFactory.cs" />
+ <Compile Include="Remote\Impl\TransportClient.cs" />
+ <Compile Include="Remote\Impl\TransportEvent.cs" />
+ <Compile Include="Remote\Impl\TransportServer.cs" />
+ <Compile Include="Remote\IRemoteEvent.cs" />
+ <Compile Include="Remote\IRemoteIdentifier.cs" />
+ <Compile Include="Remote\IRemoteIdentifierFactory.cs" />
+ <Compile Include="Remote\IRemoteManager.cs" />
+ <Compile Include="Remote\IRemoteMessage.cs" />
+ <Compile Include="Remote\ISubscriptionManager.cs" />
+ <Compile Include="Remote\Proto\WakeRemoteProtos.cs" />
+ <Compile Include="Remote\RemoteConfiguration.cs" />
+ <Compile Include="Remote\RemoteRuntimeException.cs" />
+ <Compile Include="RX\AbstractObserver.cs" />
+ <Compile Include="RX\AbstractRxStage.cs" />
+ <Compile Include="RX\Impl\PubSubSubject.cs" />
+ <Compile Include="RX\Impl\RxSyncStage.cs" />
+ <Compile Include="RX\Impl\RxThreadPoolStage.cs" />
+ <Compile Include="RX\Impl\RxTimerStage.cs" />
+ <Compile Include="RX\Impl\SimpleSubject.cs" />
+ <Compile Include="RX\IRxStage.cs" />
+ <Compile Include="RX\IStaticObservable.cs" />
+ <Compile Include="RX\ISubject.cs" />
+ <Compile Include="RX\ObserverCompletedException.cs" />
+ <Compile Include="src\main\cs\Examples\P2p\IEventSource.cs" />
+ <Compile Include="src\main\cs\Examples\P2p\Pull2Push.cs" />
+ <Compile Include="src\main\cs\PeriodicEvent.cs" />
+ <Compile Include="Protobuf\WakeRemoteProtosGen.cs" />
+ <Compile Include="Time\Event\Alarm.cs" />
+ <Compile Include="Time\Event\StartTime.cs" />
+ <Compile Include="Time\Event\StopTime.cs" />
+ <Compile Include="Time\IClock.cs" />
+ <Compile Include="Time\Runtime\Event\ClientAlarm.cs" />
+ <Compile Include="Time\Runtime\Event\IdleClock.cs" />
+ <Compile Include="Time\Runtime\Event\RuntimeAlarm.cs" />
+ <Compile Include="Time\Runtime\Event\RuntimeStart.cs" />
+ <Compile Include="Time\Runtime\Event\RuntimeStop.cs" />
+ <Compile Include="Time\Runtime\ITimer.cs" />
+ <Compile Include="Time\Runtime\LogicalTimer.cs" />
+ <Compile Include="Time\Runtime\RealTimer.cs" />
+ <Compile Include="Time\Runtime\RuntimeClock.cs" />
+ <Compile Include="Time\Time.cs" />
+ <Compile Include="Util\Actionable.cs" />
+ <Compile Include="Util\Disposable.cs" />
+ <Compile Include="Util\FixedThreadPoolTaskService.cs" />
+ <Compile Include="Util\IStartable.cs" />
+ <Compile Include="Util\ITaskService.cs" />
+ <Compile Include="Util\LimitedConcurrencyLevelTaskScheduler.cs" />
+ <Compile Include="Util\NetworkUtils.cs" />
+ <Compile Include="Util\SerializationHelper.cs" />
+ <Compile Include="Util\TaskExtensions.cs" />
+ <Compile Include="Util\TimeHelper.cs" />
+ <Compile Include="WakeRuntimeException.cs" />
+ </ItemGroup>
+ <ItemGroup>
+ <None Include="packages.config" />
+ <None Include="Protobuf\RemoteProtocol.proto" />
+ </ItemGroup>
+ <ItemGroup>
+ <Folder Include="Time\Time\Event\" />
+ <Folder Include="Time\Time\Runtime\Event\" />
+ <Folder Include="Util\Util\" />
+ </ItemGroup>
+ <ItemGroup>
+ <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Tang\Org.Apache.REEF.Tang.csproj">
+ <Project>{97dbb573-3994-417a-9f69-ffa25f00d2a6}</Project>
+ <Name>Org.Apache.REEF.Tang</Name>
+ </ProjectReference>
+ <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Utilities\Org.Apache.REEF.Utilities.csproj">
+ <Project>{79e7f89a-1dfb-45e1-8d43-d71a954aeb98}</Project>
+ <Name>Org.Apache.REEF.Utilities</Name>
+ </ProjectReference>
+ </ItemGroup>
+ <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
+ <Import Project="$(SolutionDir)\.nuget\NuGet.targets" Condition="Exists('$(SolutionDir)\.nuget\NuGet.targets')" />
+ <!-- To modify your build process, add your task inside one of the targets below and uncomment it.
+ Other similar extension points exist, see Microsoft.Common.targets.
+ <Target Name="BeforeBuild">
+ </Target>
+ <Target Name="AfterBuild">
+ </Target>
+ -->
+</Project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Properties/AssemblyInfo.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Properties/AssemblyInfo.cs b/lang/cs/Org.Apache.REEF.Wake/Properties/AssemblyInfo.cs
new file mode 100644
index 0000000..48a4764
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Properties/AssemblyInfo.cs
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Reflection;
+using System.Runtime.CompilerServices;
+using System.Runtime.InteropServices;
+
+// General Information about an assembly is controlled through the following
+// set of attributes. Change these attribute values to modify the information
+// associated with an assembly.
+[assembly: AssemblyTitle("Org.Apache.REEF.Wake")]
+[assembly: AssemblyDescription("")]
+[assembly: AssemblyConfiguration("")]
+[assembly: AssemblyCompany("")]
+[assembly: AssemblyProduct("Org.Apache.REEF.Wake")]
+[assembly: AssemblyCopyright("Copyright © 2015")]
+[assembly: AssemblyTrademark("")]
+[assembly: AssemblyCulture("")]
+
+// Setting ComVisible to false makes the types in this assembly not visible
+// to COM components. If you need to access a type in this assembly from
+// COM, set the ComVisible attribute to true on that type.
+[assembly: ComVisible(false)]
+
+// The following GUID is for the ID of the typelib if this project is exposed to COM
+[assembly: Guid("86a66ac8-0c8e-4652-b533-670e800cb0ea")]
+
+// Version information for an assembly consists of the following four values:
+//
+// Major Version
+// Minor Version
+// Build Number
+// Revision
+//
+// You can specify all the values or you can default the Build and Revision Numbers
+// by using the '*' as shown below:
+// [assembly: AssemblyVersion("1.0.*")]
+[assembly: AssemblyVersion("1.0.0.0")]
+[assembly: AssemblyFileVersion("1.0.0.0")]
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Protobuf/RemoteProtocol.proto
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Protobuf/RemoteProtocol.proto b/lang/cs/Org.Apache.REEF.Wake/Protobuf/RemoteProtocol.proto
new file mode 100644
index 0000000..cd28d13
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Protobuf/RemoteProtocol.proto
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+option java_package = "org.apache.reef.wake.remote.proto";
+option java_outer_classname = "WakeRemoteProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+message WakeMessagePBuf {
+ required bytes data = 1;
+ required int64 seq = 2;
+ optional string source = 3;
+ optional string sink = 4;
+}
+
+message WakeTuplePBuf {
+ required string className = 1;
+ required bytes data = 2;
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Protobuf/WakeRemoteProtosGen.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Protobuf/WakeRemoteProtosGen.cs b/lang/cs/Org.Apache.REEF.Wake/Protobuf/WakeRemoteProtosGen.cs
new file mode 100644
index 0000000..f3c59f8
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Protobuf/WakeRemoteProtosGen.cs
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+//------------------------------------------------------------------------------
+// <auto-generated>
+// This code was generated by a tool.
+//
+// Changes to this file may cause incorrect behavior and will be lost if
+// the code is regenerated.
+// </auto-generated>
+//------------------------------------------------------------------------------
+
+// Generated from: src/main/proto/RemoteProtocol.proto
+namespace Org.Apache.REEF.Wake.Remote.Proto.WakeRemoteProtos
+{
+ [global::System.Serializable, global::ProtoBuf.ProtoContract(Name=@"WakeMessagePBuf")]
+ public partial class WakeMessagePBuf : global::ProtoBuf.IExtensible
+ {
+ public WakeMessagePBuf() {}
+
+ private byte[] _data;
+ [global::ProtoBuf.ProtoMember(1, IsRequired = true, Name=@"data", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ public byte[] data
+ {
+ get { return _data; }
+ set { _data = value; }
+ }
+ private long _seq;
+ [global::ProtoBuf.ProtoMember(2, IsRequired = true, Name=@"seq", DataFormat = global::ProtoBuf.DataFormat.TwosComplement)]
+ public long seq
+ {
+ get { return _seq; }
+ set { _seq = value; }
+ }
+ private string _source = "";
+ [global::ProtoBuf.ProtoMember(3, IsRequired = false, Name=@"source", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ [global::System.ComponentModel.DefaultValue("")]
+ public string source
+ {
+ get { return _source; }
+ set { _source = value; }
+ }
+ private string _sink = "";
+ [global::ProtoBuf.ProtoMember(4, IsRequired = false, Name=@"sink", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ [global::System.ComponentModel.DefaultValue("")]
+ public string sink
+ {
+ get { return _sink; }
+ set { _sink = value; }
+ }
+ private global::ProtoBuf.IExtension extensionObject;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ { return global::ProtoBuf.Extensible.GetExtensionObject(ref extensionObject, createIfMissing); }
+ }
+
+ [global::System.Serializable, global::ProtoBuf.ProtoContract(Name=@"WakeTuplePBuf")]
+ public partial class WakeTuplePBuf : global::ProtoBuf.IExtensible
+ {
+ public WakeTuplePBuf() {}
+
+ private string _className;
+ [global::ProtoBuf.ProtoMember(1, IsRequired = true, Name=@"className", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ public string className
+ {
+ get { return _className; }
+ set { _className = value; }
+ }
+ private byte[] _data;
+ [global::ProtoBuf.ProtoMember(2, IsRequired = true, Name=@"data", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ public byte[] data
+ {
+ get { return _data; }
+ set { _data = value; }
+ }
+ private global::ProtoBuf.IExtension extensionObject;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ { return global::ProtoBuf.Extensible.GetExtensionObject(ref extensionObject, createIfMissing); }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/RX/AbstractObserver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/RX/AbstractObserver.cs b/lang/cs/Org.Apache.REEF.Wake/RX/AbstractObserver.cs
new file mode 100644
index 0000000..3c3451b
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/RX/AbstractObserver.cs
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Utilities.Logging;
+using System;
+
+namespace Org.Apache.REEF.Wake.RX
+{
+ /// <summary>
+ /// An observer with logging-only onError and onCompleted() methods.
+ /// </summary>
+ /// <typeparam name="T">The observer type</typeparam>
+ public abstract class AbstractObserver<T> : IObserver<T>
+ {
+ private static readonly Logger LOGGER = Logger.GetLogger(typeof(AbstractObserver<T>));
+
+ public virtual void OnError(Exception error)
+ {
+ LOGGER.Log(Level.Info, "The observer " + GetType() + "has received an Exception: " + error);
+ }
+
+ public virtual void OnCompleted()
+ {
+ LOGGER.Log(Level.Verbose, "The observer " + GetType() + "has received an onCompleted() ");
+ }
+
+ public abstract void OnNext(T arg1);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/RX/AbstractRxStage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/RX/AbstractRxStage.cs b/lang/cs/Org.Apache.REEF.Wake/RX/AbstractRxStage.cs
new file mode 100644
index 0000000..5238935
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/RX/AbstractRxStage.cs
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+
+namespace Org.Apache.REEF.Wake.RX
+{
+ /// <summary>
+ /// An Rx stage that implements metering
+ /// </summary>
+ public abstract class AbstractRxStage<T> : IRxStage<T>
+ {
+ //protected internal readonly Meter meter;
+
+ /// <summary>Constructs an abstact rxstage</summary>
+ /// <param name="meterName">the name of the meter</param>
+ public AbstractRxStage(string meterName)
+ {
+ //meter = new Meter(meterName);
+ }
+
+ /// <summary>Updates the meter</summary>
+ /// <param name="value">the event</param>
+ public virtual void OnNext(T value)
+ {
+ //meter.Mark(1);
+ }
+
+ public abstract void OnCompleted();
+
+ public abstract void OnError(Exception error);
+
+ public virtual void Dispose()
+ {
+ // no op
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/RX/IRxStage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/RX/IRxStage.cs b/lang/cs/Org.Apache.REEF.Wake/RX/IRxStage.cs
new file mode 100644
index 0000000..bc93a0e
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/RX/IRxStage.cs
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+
+namespace Org.Apache.REEF.Wake.RX
+{
+ /// <summary>Stage that executes the observer</summary>
+ public interface IRxStage<T> : IObserver<T>, IStage
+ {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/RX/IStaticObservable.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/RX/IStaticObservable.cs b/lang/cs/Org.Apache.REEF.Wake/RX/IStaticObservable.cs
new file mode 100644
index 0000000..035953c
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/RX/IStaticObservable.cs
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+
+namespace Org.Apache.REEF.Wake.RX
+{
+ public interface IStaticObservable
+ {
+ //intentionally empty
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/RX/ISubject.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/RX/ISubject.cs b/lang/cs/Org.Apache.REEF.Wake/RX/ISubject.cs
new file mode 100644
index 0000000..9829dfd
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/RX/ISubject.cs
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+
+namespace Org.Apache.REEF.Wake.RX
+{
+ /// <summary>A class implementing Observer> and StaticObservable</summary>
+ /// <typeparam name="In">The in type</typeparam>
+ /// <typeparam name="Out">The out type</typeparam>
+ public interface ISubject<In, Out> : IObserver<In>, IStaticObservable
+ {
+ // intentionally empty
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/RX/Impl/PubSubSubject.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/RX/Impl/PubSubSubject.cs b/lang/cs/Org.Apache.REEF.Wake/RX/Impl/PubSubSubject.cs
new file mode 100644
index 0000000..73962dd
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/RX/Impl/PubSubSubject.cs
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Reactive.Subjects;
+using System.Reflection;
+using System.Text;
+using Org.Apache.REEF.Utilities.Diagnostics;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Wake.RX.Impl
+{
+ /// <summary>
+ /// Subject to provide publish/subscribe interface.
+ /// Subscribes to class Types and invokes handlers for a given
+ /// type on call to OnNext
+ /// </summary>
+ /// <typeparam name="T">The super type that all event types
+ /// inherit from</typeparam>
+ public class PubSubSubject<T> : IObserver<T>
+ {
+ private static readonly Logger LOGGER = Logger.GetLogger(typeof(PubSubSubject<T>));
+
+ private Dictionary<Type, List<object>> _classToObserversMap;
+ private bool _completed;
+ private object _mutex;
+
+ /// <summary>
+ /// Constructs a pub-sub Subject
+ /// </summary>
+ public PubSubSubject()
+ {
+ _classToObserversMap = new Dictionary<Type, List<object>>();
+ _mutex = new object();
+ }
+
+ /// <summary>
+ /// Log on completion
+ /// </summary>
+ public void OnCompleted()
+ {
+ lock (_mutex)
+ {
+ _completed = true;
+ }
+ }
+
+ /// <summary>
+ /// Log Exception
+ /// </summary>
+ /// <param name="error"></param>
+ public void OnError(Exception error)
+ {
+ lock (_mutex)
+ {
+ _completed = true;
+ }
+ }
+
+ /// <summary>
+ /// Invoke the subscribed handlers for the event class type
+ /// </summary>
+ /// <param name="value">The event to process</param>
+ public void OnNext(T value)
+ {
+ if (value == null)
+ {
+ Exceptions.Throw(new ArgumentNullException("value"), LOGGER);
+ }
+
+ lock (_mutex)
+ {
+ // If OnCompleted or OnError called, do nothing
+ if (_completed)
+ {
+ return;
+ }
+
+ // Check that the event type has been subscribed
+ List<object> handlers;
+ if (!_classToObserversMap.TryGetValue(value.GetType(), out handlers))
+ {
+ Exceptions.Throw(new ArgumentException("No event for type " + value.GetType()), LOGGER);
+ }
+
+ // Invoke each IObserver for the event type
+ foreach (object handler in handlers)
+ {
+ Type handlerType = typeof(IObserver<>).MakeGenericType(new[] { value.GetType() });
+ MethodInfo info = handlerType.GetMethod("OnNext");
+ info.Invoke(handler, new[] { (object) value });
+ }
+ }
+ }
+
+ /// <summary>
+ /// Subscribe an IObserver for an event type
+ /// </summary>
+ /// <typeparam name="U">The event type</typeparam>
+ /// <param name="observer">The observer to handle the event</param>
+ /// <returns>An IDisposable object used to handle unsubscribing
+ /// the IObserver</returns>
+ public IDisposable Subscribe<U>(IObserver<U> observer) where U : T
+ {
+ lock (_mutex)
+ {
+ List<object> observers;
+ if (!_classToObserversMap.TryGetValue(typeof(U), out observers))
+ {
+ observers = new List<object>();
+ _classToObserversMap[typeof(U)] = observers;
+ }
+ observers.Add(observer);
+ }
+
+ return new DisposableResource<U>(_classToObserversMap, observer, _mutex);
+ }
+
+ /// <summary>
+ /// Utility class to handle disposing of an IObserver
+ /// </summary>
+ private class DisposableResource<U> : IDisposable
+ {
+ private Dictionary<Type, List<object>> _observersMap;
+ private IObserver<U> _observer;
+ private object _mutex;
+ private bool _disposed;
+
+ public DisposableResource(Dictionary<Type, List<object>> observersMap, IObserver<U> observer, object mutex)
+ {
+ _observersMap = observersMap;
+ _observer = observer;
+ _mutex = mutex;
+ _disposed = false;
+ }
+
+ /// <summary>
+ /// Unsubscribe the IObserver from the observer map
+ /// </summary>
+ public void Dispose()
+ {
+ if (!_disposed)
+ {
+ UnsubscribeObserver();
+ _disposed = true;
+ }
+ }
+
+ private void UnsubscribeObserver()
+ {
+ lock (_mutex)
+ {
+ List<object> observers;
+ if (_observersMap.TryGetValue(typeof(U), out observers))
+ {
+ observers.Remove(_observer);
+ }
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/RX/Impl/RxSyncStage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/RX/Impl/RxSyncStage.cs b/lang/cs/Org.Apache.REEF.Wake/RX/Impl/RxSyncStage.cs
new file mode 100644
index 0000000..e2527a0
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/RX/Impl/RxSyncStage.cs
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+
+namespace Org.Apache.REEF.Wake.RX.Impl
+{
+ /// <summary>Stage that executes the observer synchronously</summary>
+ public class RxSyncStage<T> : AbstractRxStage<T>
+ {
+ private readonly IObserver<T> _observer;
+
+ /// <summary>Constructs a Rx synchronous stage</summary>
+ /// <param name="observer">the observer</param>
+ public RxSyncStage(IObserver<T> observer) : base(observer.GetType().FullName)
+ {
+ _observer = observer;
+ }
+
+ /// <summary>Provides the observer with the new value</summary>
+ /// <param name="value">the new value</param>
+ public override void OnNext(T value)
+ {
+ base.OnNext(value);
+ _observer.OnNext(value);
+ }
+
+ /// <summary>
+ /// Notifies the observer that the provider has experienced an error
+ /// condition.
+ /// </summary>
+ /// <param name="error">the error</param>
+ public override void OnError(Exception error)
+ {
+ _observer.OnError(error);
+ }
+
+ /// <summary>
+ /// Notifies the observer that the provider has finished sending push-based
+ /// notifications.
+ /// </summary>
+ public override void OnCompleted()
+ {
+ _observer.OnCompleted();
+ }
+
+ /// <summary>
+ /// Closes the stage
+ /// </summary>
+ public override void Dispose()
+ {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/RX/Impl/RxThreadPoolStage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/RX/Impl/RxThreadPoolStage.cs b/lang/cs/Org.Apache.REEF.Wake/RX/Impl/RxThreadPoolStage.cs
new file mode 100644
index 0000000..d329430
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/RX/Impl/RxThreadPoolStage.cs
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Utilities.Diagnostics;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Wake.Util;
+using System;
+
+namespace Org.Apache.REEF.Wake.RX.Impl
+{
+ /// <summary>Stage that executes the observer with a thread pool</summary>
+ public class RxThreadPoolStage<T> : AbstractRxStage<T>
+ {
+ private static readonly Logger LOGGER = Logger.GetLogger(typeof(RxThreadPoolStage<T>));
+
+ private readonly IObserver<T> _observer;
+
+ private readonly ITaskService _taskService;
+
+ /// <summary>Constructs a Rx thread pool stage</summary>
+ /// <param name="observer">the observer to execute</param>
+ /// <param name="numThreads">the number of threads</param>
+ public RxThreadPoolStage(IObserver<T> observer, int numThreads)
+ : base(observer.GetType().FullName)
+ {
+ _observer = observer;
+ if (numThreads <= 0)
+ {
+ Exceptions.Throw(new WakeRuntimeException("numThreads " + numThreads + " is less than or equal to 0"), LOGGER);
+ }
+ _taskService = new FixedThreadPoolTaskService(numThreads);
+ }
+
+ /// <summary>Provides the observer with the new value</summary>
+ /// <param name="value">the new value</param>
+ public override void OnNext(T value)
+ {
+ base.OnNext(value);
+ _taskService.Execute(new _Startable_58(this, value).Start);
+ }
+
+ /// <summary>
+ /// Notifies the observer that the provider has experienced an error
+ /// condition.
+ /// </summary>
+ /// <param name="error">the error</param>
+ public override void OnError(Exception error)
+ {
+ _taskService.Execute(new _Startable_75(this, error).Start);
+ }
+
+ /// <summary>
+ /// Notifies the observer that the provider has finished sending push-based
+ /// notifications.
+ /// </summary>
+ public override void OnCompleted()
+ {
+ _taskService.Execute(new _Startable_91(this).Start);
+ }
+
+ /// <summary>
+ /// Closes the stage
+ /// </summary>
+ public override void Dispose()
+ {
+ _taskService.Shutdown();
+ }
+
+ private sealed class _Startable_58 : IStartable
+ {
+ private readonly RxThreadPoolStage<T> _enclosing;
+ private readonly T _value;
+
+ public _Startable_58(RxThreadPoolStage<T> enclosing, T value)
+ {
+ _enclosing = enclosing;
+ _value = value;
+ }
+
+ public void Start()
+ {
+ _enclosing._observer.OnNext(_value);
+ }
+ }
+
+ private sealed class _Startable_75 : IStartable
+ {
+ private readonly RxThreadPoolStage<T> _enclosing;
+ private readonly Exception _error;
+
+ public _Startable_75(RxThreadPoolStage<T> enclosing, Exception error)
+ {
+ _enclosing = enclosing;
+ _error = error;
+ }
+
+ public void Start()
+ {
+ _enclosing._observer.OnError(_error);
+ }
+ }
+
+ private sealed class _Startable_91 : IStartable
+ {
+ private readonly RxThreadPoolStage<T> _enclosing;
+
+ public _Startable_91(RxThreadPoolStage<T> enclosing)
+ {
+ _enclosing = enclosing;
+ }
+
+ public void Start()
+ {
+ _enclosing._observer.OnCompleted();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/RX/Impl/RxTimerStage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/RX/Impl/RxTimerStage.cs b/lang/cs/Org.Apache.REEF.Wake/RX/Impl/RxTimerStage.cs
new file mode 100644
index 0000000..541c2d4
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/RX/Impl/RxTimerStage.cs
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Timers;
+
+using Org.Apache.REEF.Wake.Impl;
+
+namespace Org.Apache.REEF.Wake.RX.Impl
+{
+ /// <summary>Timer stage that provides events to the observer periodically</summary>
+ public class RxTimerStage : IStage, IStaticObservable
+ {
+ private readonly Timer _timer;
+ private readonly PeriodicEvent _value = new PeriodicEvent();
+ private readonly IObserver<PeriodicEvent> _observer;
+
+ /// <summary>Constructs a Rx timer stage</summary>
+ /// <param name="observer">the observer</param>
+ /// <param name="period">the period in milli-seconds</param>
+ public RxTimerStage(IObserver<PeriodicEvent> observer, long period)
+ : this(observer, 0, period)
+ {
+ }
+
+ /// <summary>Constructs a Rx timer stage</summary>
+ /// <param name="observer">the observer</param>
+ /// <param name="initialDelay">the initial delay in milli-seconds</param>
+ /// <param name="period">the period in milli-seconds</param>
+ public RxTimerStage(IObserver<PeriodicEvent> observer, long initialDelay, long period)
+ {
+ _observer = observer;
+ _timer = new Timer(period);
+ _timer.Elapsed += (sender, e) => OnTimedEvent(sender, e, _observer, _value);
+ _timer.Enabled = true;
+ }
+
+ /// <summary>
+ /// Closes the stage
+ /// </summary>
+ public void Dispose()
+ {
+ _timer.Stop();
+ }
+
+ private static void OnTimedEvent(object source, ElapsedEventArgs e, IObserver<PeriodicEvent> observer, PeriodicEvent value)
+ {
+ observer.OnNext(value);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/RX/Impl/SimpleSubject.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/RX/Impl/SimpleSubject.cs b/lang/cs/Org.Apache.REEF.Wake/RX/Impl/SimpleSubject.cs
new file mode 100644
index 0000000..7d4aaa8
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/RX/Impl/SimpleSubject.cs
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+
+namespace Org.Apache.REEF.Wake.RX.Impl
+{
+ /// <summary>A Subject that relays all messages to its subscribers.</summary>
+ public class SimpleSubject<T> : ISubject<T, T>
+ {
+ private readonly IObserver<T> _observer;
+
+ /// <summary>Constructs a simple subject</summary>
+ /// <param name="observer">the observer</param>
+ public SimpleSubject(IObserver<T> observer)
+ {
+ _observer = observer;
+ }
+
+ /// <summary>Provides the observer with the new value</summary>
+ /// <param name="value">the new value</param>
+ public virtual void OnNext(T value)
+ {
+ _observer.OnNext(value);
+ }
+
+ /// <summary>Provides the observer with the error</summary>
+ /// <param name="error">the error</param>
+ public virtual void OnError(Exception error)
+ {
+ _observer.OnError(error);
+ }
+
+ /// <summary>
+ /// Provides the observer with it has finished sending push-based
+ /// notifications.
+ /// </summary>
+ public virtual void OnCompleted()
+ {
+ _observer.OnCompleted();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/RX/ObserverCompletedException.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/RX/ObserverCompletedException.cs b/lang/cs/Org.Apache.REEF.Wake/RX/ObserverCompletedException.cs
new file mode 100644
index 0000000..e620a7c
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/RX/ObserverCompletedException.cs
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+
+namespace Org.Apache.REEF.Wake.RX
+{
+ /// <summary>
+ /// It is illegal to call onError() or onCompleted() when a call to onNext() is
+ /// still outstanding, or to call onNext(), onError() or onCompleted() after a
+ /// call to onError() or onCompleted() has been dispatched.
+ /// </summary>
+ /// <remarks>
+ /// It is illegal to call onError() or onCompleted() when a call to onNext() is
+ /// still outstanding, or to call onNext(), onError() or onCompleted() after a
+ /// call to onError() or onCompleted() has been dispatched. Observers may throw
+ /// an ObserverCompleted exception whenever this API is violated. Violating the
+ /// API leaves the Observer (and any resources that it holds) in an undefined
+ /// state, and throwing ObserverCompleted exceptions is optional.
+ /// Callers receiving this exception should simply pass it up the stack to the
+ /// Aura runtime. They should not attempt to forward it on to upstream or
+ /// downstream stages. The easiest way to do this is to ignore the exception
+ /// entirely.
+ /// </remarks>
+ [System.Serializable]
+ public class ObserverCompletedException : InvalidOperationException
+ {
+ private const long serialVersionUID = 1L;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/ICodec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/ICodec.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/ICodec.cs
new file mode 100644
index 0000000..2591d1e
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/ICodec.cs
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.REEF.Wake.Remote
+{
+ public interface ICodec
+ {
+ }
+
+ /// <summary>
+ /// Interface for serialization routines that translate back and forth between
+ /// byte arrays with low latency.
+ /// </summary>
+ /// <typeparam name="T">The codec type</typeparam>
+ public interface ICodec<T> : ICodec, IEncoder<T>, IDecoder<T>
+ {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/ICodecFactory.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/ICodecFactory.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/ICodecFactory.cs
new file mode 100644
index 0000000..68b656a
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/ICodecFactory.cs
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Wake.Remote.Impl;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Org.Apache.REEF.Wake.Remote
+{
+ [DefaultImplementation(typeof(ByteCodecFactory))]
+ public interface ICodecFactory
+ {
+ object Create();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/IDecoder.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/IDecoder.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/IDecoder.cs
new file mode 100644
index 0000000..07dd28a
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/IDecoder.cs
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.REEF.Wake.Remote
+{
+ public interface IDecoder
+ {
+ }
+
+ /// <summary>
+ /// Interface for serialization routines that translate back and forth between
+ /// byte arrays with low latency.
+ /// </summary>
+ /// <typeparam name="T">The decoder type</typeparam>
+ public interface IDecoder<T> : IDecoder
+ {
+ /// <summary>Decodes the given byte array into an object</summary>
+ /// <param name="data"></param>
+ /// <returns>the decoded object</returns>
+ T Decode(byte[] data);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/IEncoder.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/IEncoder.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/IEncoder.cs
new file mode 100644
index 0000000..b399131
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/IEncoder.cs
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.REEF.Wake.Remote
+{
+ public interface IEncoder
+ {
+ }
+
+ /// <summary>
+ /// Interface for serialization routines that translate back and forth between
+ /// byte arrays with low latency.
+ /// </summary>
+ /// <typeparam name="T">The encoder type</typeparam>
+ public interface IEncoder<T> : IEncoder
+ {
+ /// <summary>Encodes the given object into a Byte Array</summary>
+ /// <param name="obj"></param>
+ /// <returns>a byte[] representation of the object</returns>
+ byte[] Encode(T obj);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/ILink.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/ILink.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/ILink.cs
new file mode 100644
index 0000000..c0ce1a2
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/ILink.cs
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Net;
+using System.Net.Sockets;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Wake.Remote.Impl;
+
+namespace Org.Apache.REEF.Wake.Remote
+{
+ /// <summary>
+ /// Represents a link between two endpoints
+ /// </summary>
+ public interface ILink<T> : IDisposable
+ {
+ /// <summary>
+ /// Returns the local socket address
+ /// </summary>
+ IPEndPoint LocalEndpoint { get; }
+
+ /// <summary>
+ /// Returns the remote socket address
+ /// </summary>
+ IPEndPoint RemoteEndpoint { get; }
+
+ /// <summary>
+ /// Writes the value to this link asynchronously
+ /// </summary>
+ /// <param name="value">The data to write</param>
+ /// <param name="token">The cancellation token</param>
+ Task WriteAsync(T value, CancellationToken token);
+
+ /// <summary>
+ /// Writes the value to this link synchronously
+ /// </summary>
+ /// <param name="value">The data to write</param>
+ void Write(T value);
+
+ /// <summary>
+ /// Reads the value from this link asynchronously
+ /// </summary>
+ /// <returns>The read data</returns>
+ /// <param name="token">The cancellation token</param>
+ Task<T> ReadAsync(CancellationToken token);
+
+ /// <summary>
+ /// Reads the value from this link synchronously
+ /// </summary>
+ /// <returns>The read data</returns>
+ T Read();
+ }
+}