You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/02/05 22:05:47 UTC
[26/51] [partial] incubator-reef git commit: [REEF-131] Towards the
new .Net project structure This is to change .Net project structure for Tang,
Wake, REEF utilities, Common and Driver:
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TransportClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TransportClient.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TransportClient.cs
new file mode 100644
index 0000000..85d6359
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TransportClient.cs
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Net;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Org.Apache.REEF.Wake.Remote.Impl
+{
+ /// <summary>
+ /// Establish connections to TransportServer for remote message passing
+ /// </summary>
+ public class TransportClient<T> : IDisposable
+ {
+ private ILink<T> _link;
+ private IObserver<TransportEvent<T>> _observer;
+ private CancellationTokenSource _cancellationSource;
+ private bool _disposed;
+
+ /// <summary>
+ /// Construct a TransportClient.
+ /// Used to send messages to the specified remote endpoint.
+ /// </summary>
+ /// <param name="remoteEndpoint">The endpoint of the remote server to connect to</param>
+ /// <param name="codec">Codec to decode/encodec</param>
+ public TransportClient(IPEndPoint remoteEndpoint, ICodec<T> codec)
+ {
+ if (remoteEndpoint == null)
+ {
+ throw new ArgumentNullException("remoteEndpoint");
+ }
+ if (codec == null)
+ {
+ throw new ArgumentNullException("codec");
+ }
+
+ _link = new Link<T>(remoteEndpoint, codec);
+ _cancellationSource = new CancellationTokenSource();
+ _disposed = false;
+ }
+
+ /// <summary>
+ /// Construct a TransportClient.
+ /// Used to send messages to the specified remote endpoint.
+ /// </summary>
+ /// <param name="remoteEndpoint">The endpoint of the remote server to connect to</param>
+ /// <param name="codec">Codec to decode/encodec</param>
+ /// <param name="observer">Callback used when receiving responses from remote host</param>
+ public TransportClient(IPEndPoint remoteEndpoint,
+ ICodec<T> codec,
+ IObserver<TransportEvent<T>> observer)
+ : this(remoteEndpoint, codec)
+ {
+ _observer = observer;
+ Task.Run(() => ResponseLoop());
+ }
+
+ /// <summary>
+ /// Gets the underlying transport link.
+ /// </summary>
+ public ILink<T> Link
+ {
+ get { return _link; }
+ }
+
+ /// <summary>
+ /// Send the remote message.
+ /// </summary>
+ /// <param name="message">The message to send</param>
+ public void Send(T message)
+ {
+ if (message == null)
+ {
+ throw new ArgumentNullException("message");
+ }
+
+ _link.Write(message);
+ }
+
+ /// <summary>
+ /// Close all opened connections
+ /// </summary>
+ public void Dispose()
+ {
+ Dispose(true);
+ GC.SuppressFinalize(this);
+ }
+
+ protected void Dispose(bool disposing)
+ {
+ if (!_disposed && disposing)
+ {
+ _link.Dispose();
+ _disposed = true;
+ }
+ }
+
+ /// <summary>
+ /// Continually read responses from remote host
+ /// </summary>
+ private async Task ResponseLoop()
+ {
+ while (!_cancellationSource.IsCancellationRequested)
+ {
+ T message = await _link.ReadAsync(_cancellationSource.Token);
+ if (message == null)
+ {
+ break;
+ }
+
+ TransportEvent<T> transportEvent = new TransportEvent<T>(message, _link);
+ _observer.OnNext(transportEvent);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TransportEvent.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TransportEvent.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TransportEvent.cs
new file mode 100644
index 0000000..0e1eff7
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TransportEvent.cs
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Net;
+using System.Net.Sockets;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Org.Apache.REEF.Wake.Remote.Impl
+{
+ public class TransportEvent<T>
+ {
+ public TransportEvent(T data, ILink<T> link)
+ {
+ Data = data;
+ Link = link;
+ }
+
+ public T Data { get; private set; }
+
+ public ILink<T> Link { get; private set; }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TransportServer.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TransportServer.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TransportServer.cs
new file mode 100644
index 0000000..c953789
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TransportServer.cs
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Utilities.Diagnostics;
+using Org.Apache.REEF.Utilities.Logging;
+using System;
+using System.Globalization;
+using System.Net;
+using System.Net.Sockets;
+using System.Threading;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Wake.Util;
+
+namespace Org.Apache.REEF.Wake.Remote.Impl
+{
+ /// <summary>
+ /// Server to handle incoming remote messages.
+ /// </summary>
+ public class TransportServer<T> : IDisposable
+ {
+ private static readonly Logger LOGGER = Logger.GetLogger(typeof(TransportServer<>));
+
+ private TcpListener _listener;
+ private CancellationTokenSource _cancellationSource;
+ private IObserver<TransportEvent<T>> _remoteObserver;
+ private ICodec<T> _codec;
+ private bool _disposed;
+ private Task _serverTask;
+
+ /// <summary>
+ /// Constructs a TransportServer to listen for remote events.
+ /// Listens on the specified remote endpoint. When it recieves a remote
+ /// event, it will envoke the specified remote handler.
+ /// </summary>
+ /// <param name="port">Port to listen on</param>
+ /// <param name="remoteHandler">The handler to invoke when receiving incoming
+ /// remote messages</param>
+ /// <param name="codec">The codec to encode/decode"</param>
+ public TransportServer(int port, IObserver<TransportEvent<T>> remoteHandler, ICodec<T> codec)
+ : this(new IPEndPoint(NetworkUtils.LocalIPAddress, port), remoteHandler, codec)
+ {
+ }
+
+ /// <summary>
+ /// Constructs a TransportServer to listen for remote events.
+ /// Listens on the specified remote endpoint. When it recieves a remote
+ /// event, it will envoke the specified remote handler.
+ /// </summary>
+ /// <param name="localEndpoint">Endpoint to listen on</param>
+ /// <param name="remoteHandler">The handler to invoke when receiving incoming
+ /// remote messages</param>
+ /// <param name="codec">The codec to encode/decode"</param>
+ public TransportServer(IPEndPoint localEndpoint,
+ IObserver<TransportEvent<T>> remoteHandler,
+ ICodec<T> codec)
+ {
+ _listener = new TcpListener(localEndpoint.Address, localEndpoint.Port);
+ _remoteObserver = remoteHandler;
+ _cancellationSource = new CancellationTokenSource();
+ _cancellationSource.Token.ThrowIfCancellationRequested();
+ _codec = codec;
+ _disposed = false;
+ }
+
+ /// <summary>
+ /// Returns the listening endpoint for the TransportServer
+ /// </summary>
+ public IPEndPoint LocalEndpoint
+ {
+ get { return _listener.LocalEndpoint as IPEndPoint; }
+ }
+
+ /// <summary>
+ /// Starts listening for incoming remote messages.
+ /// </summary>
+ public void Run()
+ {
+ _listener.Start();
+ _serverTask = Task.Run(() => StartServer());
+ }
+
+ /// <summary>
+ /// Close the TransportServer and all open connections
+ /// </summary>
+ public void Dispose()
+ {
+ Dispose(true);
+ GC.SuppressFinalize(this);
+ }
+
+ public void Dispose(bool disposing)
+ {
+ if (!_disposed && disposing)
+ {
+ _cancellationSource.Cancel();
+ try
+ {
+ _listener.Stop();
+ }
+ catch (SocketException)
+ {
+ LOGGER.Log(Level.Info, "Disposing of transport server before listener is created.");
+ }
+
+ if (_serverTask != null)
+ {
+ _serverTask.Wait();
+
+ // Give the TransportServer Task 500ms to shut down, ignore any timeout errors
+ try
+ {
+ CancellationTokenSource serverDisposeTimeout = new CancellationTokenSource(500);
+ _serverTask.Wait(serverDisposeTimeout.Token);
+ }
+ catch (Exception e)
+ {
+ Console.Error.WriteLine(e);
+ }
+ finally
+ {
+ _serverTask.Dispose();
+ }
+ }
+ }
+
+ _disposed = true;
+ }
+
+ /// <summary>
+ /// Helper method to start TransportServer. This will
+ /// be run in an asynchronous Task.
+ /// </summary>
+ /// <returns>An asynchronous Task for the running server.</returns>
+ private async Task StartServer()
+ {
+ try
+ {
+ while (!_cancellationSource.Token.IsCancellationRequested)
+ {
+ TcpClient client = await _listener.AcceptTcpClientAsync().ConfigureAwait(false);
+ ProcessClient(client).Forget();
+ }
+ }
+ catch (InvalidOperationException)
+ {
+ LOGGER.Log(Level.Info, "TransportServer has been closed.");
+ }
+ catch (OperationCanceledException)
+ {
+ LOGGER.Log(Level.Info, "TransportServer has been closed.");
+ }
+ }
+
+ /// <summary>
+ /// Recieves event from connected TcpClient and invokes handler on the event.
+ /// </summary>
+ /// <param name="client">The connected client</param>
+ private async Task ProcessClient(TcpClient client)
+ {
+ // Keep reading messages from client until they disconnect or timeout
+ CancellationToken token = _cancellationSource.Token;
+ using (ILink<T> link = new Link<T>(client, _codec))
+ {
+ while (!token.IsCancellationRequested)
+ {
+ T message = await link.ReadAsync(token);
+ TransportEvent<T> transportEvent = new TransportEvent<T>(message, link);
+
+ _remoteObserver.OnNext(transportEvent);
+
+ if (message == null)
+ {
+ break;
+ }
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/Proto/WakeRemoteProtos.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Proto/WakeRemoteProtos.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Proto/WakeRemoteProtos.cs
new file mode 100644
index 0000000..8731b1d
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Proto/WakeRemoteProtos.cs
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using ProtoBuf;
+using System;
+using System.IO;
+using System.Runtime.Serialization;
+
+namespace Org.Apache.REEF.Wake.Remote.Proto.WakeRemoteProtos
+{
+ /// <summary>
+ /// Message p buff
+ /// </summary>
+ public partial class WakeMessagePBuf
+ {
+ public static WakeMessagePBuf Deserialize(byte[] bytes)
+ {
+ WakeMessagePBuf pbuf = null;
+ using (var s = new MemoryStream(bytes))
+ {
+ pbuf = Serializer.Deserialize<WakeMessagePBuf>(s);
+ }
+ return pbuf;
+ }
+
+ public byte[] Serialize()
+ {
+ using (var s = new MemoryStream())
+ {
+ Serializer.Serialize(s, this);
+ return s.ToArray();
+ }
+ }
+ }
+
+ /// <summary>
+ /// Wake tuple buf
+ /// </summary>
+ public partial class WakeTuplePBuf
+ {
+ public static WakeTuplePBuf Deserialize(byte[] bytes)
+ {
+ WakeTuplePBuf pbuf = null;
+ using (var s = new MemoryStream(bytes))
+ {
+ pbuf = Serializer.Deserialize<WakeTuplePBuf>(s);
+ }
+ return pbuf;
+ }
+
+ public byte[] Serialize()
+ {
+ using (var s = new MemoryStream())
+ {
+ Serializer.Serialize(s, this);
+ return s.ToArray();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/RemoteConfiguration.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/RemoteConfiguration.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/RemoteConfiguration.cs
new file mode 100644
index 0000000..655155f
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/RemoteConfiguration.cs
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Tang.Annotations;
+using System;
+
+namespace Org.Apache.REEF.Wake.Remote
+{
+ public class RemoteConfiguration
+ {
+ [NamedParameter(shortName: "rm_name", documentation: "The name of the remote manager.")]
+ public class ManagerName : Name<string>
+ {
+ }
+
+ [NamedParameter(shortName: "rm_host", documentation: "The host address to be used for messages.")]
+ public class HostAddress : Name<string>
+ {
+ }
+
+ [NamedParameter(shortName: "rm_port", documentation: "The port to be used for messages.")]
+ public class Port : Name<int>
+ {
+ }
+
+ [NamedParameter(documentation: "The codec to be used for messages.")]
+ public class MessageCodec : Name<ICodec<Type>>
+ {
+ }
+
+ [NamedParameter(documentation: "The event handler to be used for exception")]
+ public class ErrorHandler : Name<IObserver<Exception>>
+ {
+ }
+
+ [NamedParameter(shortName: "rm_order", documentation: "Whether or not to use the message ordering guarantee", defaultValue: "true")]
+ public class OrderingGuarantee : Name<bool>
+ {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Remote/RemoteRuntimeException.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/RemoteRuntimeException.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/RemoteRuntimeException.cs
new file mode 100644
index 0000000..150e5c8
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/RemoteRuntimeException.cs
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+
+namespace Org.Apache.REEF.Wake.Remote
+{
+ /// <summary>Wake remote runtime exception</summary>
+ [System.Serializable]
+ public class RemoteRuntimeException : Exception
+ {
+ private const long serialVersionUID = 1L;
+
+ /// <summary>Constructs a new runtime remote exception with the specified detail message and cause
+ /// </summary>
+ /// <param name="s">the detailed message</param>
+ /// <param name="e">the cause</param>
+ public RemoteRuntimeException(string s, Exception e)
+ : base(s, e)
+ {
+ }
+
+ /// <summary>Constructs a new runtime remote exception with the specified detail message
+ /// </summary>
+ /// <param name="s">the detailed message</param>
+ public RemoteRuntimeException(string s)
+ : base(s)
+ {
+ }
+
+ /// <summary>Constructs a new runtime remote exception with the specified cause</summary>
+ /// <param name="e">the cause</param>
+ public RemoteRuntimeException(Exception e)
+ : base("Runtime Exception", e)
+ {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Time/Event/Alarm.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Time/Event/Alarm.cs b/lang/cs/Org.Apache.REEF.Wake/Time/Event/Alarm.cs
new file mode 100644
index 0000000..bb50883
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Time/Event/Alarm.cs
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+
+namespace Org.Apache.REEF.Wake.Time
+{
+ /// <summary>
+ /// Represents a timer event.
+ /// </summary>
+ public abstract class Alarm : Time
+ {
+ private IObserver<Alarm> _handler;
+
+ public Alarm(long timestamp, IObserver<Alarm> handler) : base(timestamp)
+ {
+ _handler = handler;
+ }
+
+ public void Handle()
+ {
+ _handler.OnNext(this);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Time/Event/StartTime.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Time/Event/StartTime.cs b/lang/cs/Org.Apache.REEF.Wake/Time/Event/StartTime.cs
new file mode 100644
index 0000000..aeb54a6
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Time/Event/StartTime.cs
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.REEF.Wake.Time
+{
+ /// <summary>
+ /// Represents the Time at which a component started.
+ /// </summary>
+ public class StartTime : Time
+ {
+ public StartTime(long timeStamp) : base(timeStamp)
+ {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Time/Event/StopTime.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Time/Event/StopTime.cs b/lang/cs/Org.Apache.REEF.Wake/Time/Event/StopTime.cs
new file mode 100644
index 0000000..cb1f3fd
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Time/Event/StopTime.cs
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.REEF.Wake.Time
+{
+ /// <summary>
+ /// Represents the Time at which a component stops.
+ /// </summary>
+ public class StopTime : Time
+ {
+ public StopTime(long timeStamp) : base(timeStamp)
+ {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Time/IClock.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Time/IClock.cs b/lang/cs/Org.Apache.REEF.Wake/Time/IClock.cs
new file mode 100644
index 0000000..9bdbba7
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Time/IClock.cs
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Wake.Impl;
+using Org.Apache.REEF.Wake.Time.Runtime.Event;
+
+namespace Org.Apache.REEF.Wake.Time
+{
+ public abstract class IClock : IDisposable
+ {
+ /// <summary>
+ /// Schedule a TimerEvent at the given future offset
+ /// </summary>
+ /// <param name="offset">The offset in the future to schedule the alarm</param>
+ /// <param name="handler">The IObserver to to be called</param>
+ public abstract void ScheduleAlarm(long offset, IObserver<Alarm> handler);
+
+ /// <summary>
+ /// Clock is idle if it has no future alarms set
+ /// </summary>
+ /// <returns>True if no future alarms are set, otherwise false</returns>
+ public abstract bool IsIdle();
+
+ /// <summary>
+ /// Dispose of the clock and all scheduled alarms
+ /// </summary>
+ public abstract void Dispose();
+
+ /// <summary>
+ /// Bind this to an event handler to statically subscribe to the StartTime Event
+ /// </summary>
+ [NamedParameter(documentation: "Will be called upon the start even", defaultClass: typeof(MissingStartHandlerHandler))]
+ public class StartHandler : Name<ISet<IObserver<StartTime>>>
+ {
+ }
+
+ /// <summary>
+ /// Bind this to an event handler to statically subscribe to the StopTime Event
+ /// </summary>
+ [NamedParameter(documentation: "Will be called upon the stop event", defaultClass: typeof(LoggingEventHandler<StopTime>))]
+ public class StopHandler : Name<ISet<IObserver<StopTime>>>
+ {
+ }
+
+ /// <summary>
+ /// Bind this to an event handler to statically subscribe to the RuntimeStart Event
+ /// </summary>
+ [NamedParameter(documentation: "Will be called upon the runtime start event", defaultClass: typeof(LoggingEventHandler<RuntimeStart>))]
+ public class RuntimeStartHandler : Name<ISet<IObserver<RuntimeStart>>>
+ {
+ }
+
+ /// <summary>
+ /// Bind this to an event handler to statically subscribe to the RuntimeStop Event
+ /// </summary>
+ [NamedParameter(documentation: "Will be called upon the runtime stop event", defaultClass: typeof(LoggingEventHandler<RuntimeStop>))]
+ public class RuntimeStopHandler : Name<ISet<IObserver<RuntimeStop>>>
+ {
+ }
+
+ /// <summary>
+ /// Bind this to an event handler to statically subscribe to the IdleClock Event
+ /// </summary>
+ [NamedParameter(documentation: "Will be called upon the Idle event", defaultClass: typeof(LoggingEventHandler<IdleClock>))]
+ public class IdleHandler : Name<ISet<IObserver<IdleClock>>>
+ {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/Event/ClientAlarm.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/Event/ClientAlarm.cs b/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/Event/ClientAlarm.cs
new file mode 100644
index 0000000..ab2ce53
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/Event/ClientAlarm.cs
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Org.Apache.REEF.Wake.Time.Runtime.Event
+{
+ public class ClientAlarm : Alarm
+ {
+ public ClientAlarm(long timestamp, IObserver<Alarm> handler) : base(timestamp, handler)
+ {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/Event/IdleClock.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/Event/IdleClock.cs b/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/Event/IdleClock.cs
new file mode 100644
index 0000000..ff9872d
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/Event/IdleClock.cs
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.REEF.Wake.Time.Runtime.Event
+{
+ public class IdleClock : Time
+ {
+ public IdleClock(long timestamp) : base(timestamp)
+ {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/Event/RuntimeAlarm.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/Event/RuntimeAlarm.cs b/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/Event/RuntimeAlarm.cs
new file mode 100644
index 0000000..1f228dc
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/Event/RuntimeAlarm.cs
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Org.Apache.REEF.Wake.Time.Runtime.Event
+{
+ public class RuntimeAlarm : Alarm
+ {
+ public RuntimeAlarm(long timestamp, IObserver<Alarm> handler) : base(timestamp, handler)
+ {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/Event/RuntimeStart.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/Event/RuntimeStart.cs b/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/Event/RuntimeStart.cs
new file mode 100644
index 0000000..136d62d
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/Event/RuntimeStart.cs
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.REEF.Wake.Time.Runtime.Event
+{
+ public class RuntimeStart : Time
+ {
+ public RuntimeStart(long timeStamp) : base(timeStamp)
+ {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/Event/RuntimeStop.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/Event/RuntimeStop.cs b/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/Event/RuntimeStop.cs
new file mode 100644
index 0000000..75cc41b
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/Event/RuntimeStop.cs
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+
+namespace Org.Apache.REEF.Wake.Time.Runtime.Event
+{
+ public class RuntimeStop : Time
+ {
+ public RuntimeStop(long timestamp) : this(timestamp, null)
+ {
+ }
+
+ public RuntimeStop(long timestamp, Exception e) : base(timestamp)
+ {
+ Exception = e;
+ }
+
+ public Exception Exception { get; private set; }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/ITimer.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/ITimer.cs b/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/ITimer.cs
new file mode 100644
index 0000000..bfbbf05
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/ITimer.cs
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.Wake.Time.Runtime
+{
+ [DefaultImplementation(typeof(RealTimer))]
+ public interface ITimer
+ {
+ /// <summary>
+ /// Gets the current time
+ /// </summary>
+ long CurrentTime { get; }
+
+ /// <summary>
+ /// Gets the difference between the given time and the current time
+ /// </summary>
+ /// <param name="time">The time to compare against the current time</param>
+ long GetDuration(long time);
+
+ /// <summary>
+ /// Checks if the given time has already passed.
+ /// </summary>
+ /// <param name="time">The time to check if it has passed or not</param>
+ bool IsReady(long time);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/LogicalTimer.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/LogicalTimer.cs b/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/LogicalTimer.cs
new file mode 100644
index 0000000..b1cb543
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/LogicalTimer.cs
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Wake.Time.Runtime.Event;
+
+namespace Org.Apache.REEF.Wake.Time.Runtime
+{
+ /// <summary>
+ /// LogicalTimer class used for testing purposes.
+ /// </summary>
+ public class LogicalTimer : ITimer
+ {
+ [Inject]
+ public LogicalTimer()
+ {
+ }
+
+ public long CurrentTime
+ {
+ get { return 0; }
+ }
+
+ public long GetDuration(long time)
+ {
+ return 0;
+ }
+
+ public bool IsReady(long time)
+ {
+ return true;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/RealTimer.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/RealTimer.cs b/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/RealTimer.cs
new file mode 100644
index 0000000..6b5213b
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/RealTimer.cs
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Wake.Time.Runtime.Event;
+
+namespace Org.Apache.REEF.Wake.Time.Runtime
+{
+ public class RealTimer : ITimer
+ {
+ [Inject]
+ public RealTimer()
+ {
+ }
+
+ /// <summary>
+ /// Gets the number of milliseconds since Epoch
+ /// </summary>
+ public long CurrentTime
+ {
+ get { return DateTime.Now.Ticks / TimeSpan.TicksPerMillisecond; }
+ }
+
+ /// <summary>
+ /// Gets the difference between the given time and the current time
+ /// </summary>
+ /// <param name="time">The time to compare against the current time</param>
+ public long GetDuration(long time)
+ {
+ return time - CurrentTime;
+ }
+
+ /// <summary>
+ /// Checks if the given time has already passed.
+ /// </summary>
+ /// <param name="time">The time to check if it has passed or not</param>
+ public bool IsReady(long time)
+ {
+ return GetDuration(time) <= 0;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/RuntimeClock.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/RuntimeClock.cs b/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/RuntimeClock.cs
new file mode 100644
index 0000000..0871521
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/RuntimeClock.cs
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Reactive;
+using System.Reactive.Linq;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Utilities.Diagnostics;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Exceptions;
+using Org.Apache.REEF.Tang.Implementations;
+using Org.Apache.REEF.Wake.RX.Impl;
+using Org.Apache.REEF.Tang.Implementations.InjectionPlan;
+
+namespace Org.Apache.REEF.Wake.Time.Runtime.Event
+{
+ public class RuntimeClock : IClock
+ {
+ private static readonly Logger LOGGER = Logger.GetLogger(typeof(RuntimeClock));
+
+ private ITimer _timer;
+ private PubSubSubject<Time> _handlers;
+ private ISet<Time> _schedule;
+
+ private IInjectionFuture<ISet<IObserver<StartTime>>> _startHandler;
+ private IInjectionFuture<ISet<IObserver<StopTime>>> _stopHandler;
+ private IInjectionFuture<ISet<IObserver<RuntimeStart>>> _runtimeStartHandler;
+ private IInjectionFuture<ISet<IObserver<RuntimeStop>>> _runtimeStopHandler;
+ private IInjectionFuture<ISet<IObserver<IdleClock>>> _idleHandler;
+
+ private bool _disposed;
+
+ /// <summary>
+ /// Create a new RuntimeClock with injectable IObservers
+ /// </summary>
+ /// <param name="timer">The runtime clock timer</param>
+ /// <param name="startHandler">The start handler</param>
+ /// <param name="stopHandler">The stop handler</param>
+ /// <param name="runtimeStartHandler">The runtime start handler</param>
+ /// <param name="runtimeStopHandler">The runtime stop handler</param>
+ /// <param name="idleHandler">The idle handler</param>
+ [Inject]
+ internal RuntimeClock(
+ ITimer timer,
+ [Parameter(typeof(StartHandler))] IInjectionFuture<ISet<IObserver<StartTime>>> startHandler,
+ [Parameter(typeof(StopHandler))] IInjectionFuture<ISet<IObserver<StopTime>>> stopHandler,
+ [Parameter(typeof(RuntimeStartHandler))] IInjectionFuture<ISet<IObserver<RuntimeStart>>> runtimeStartHandler,
+ [Parameter(typeof(RuntimeStopHandler))] IInjectionFuture<ISet<IObserver<RuntimeStop>>> runtimeStopHandler,
+ [Parameter(typeof(IdleHandler))] IInjectionFuture<ISet<IObserver<IdleClock>>> idleHandler)
+ {
+ _timer = timer;
+ _schedule = new SortedSet<Time>();
+ _handlers = new PubSubSubject<Time>();
+
+ _startHandler = startHandler;
+ _stopHandler = stopHandler;
+ _runtimeStartHandler = runtimeStartHandler;
+ _runtimeStopHandler = runtimeStopHandler;
+ _idleHandler = idleHandler;
+ }
+
+ public IInjectionFuture<ISet<IObserver<RuntimeStart>>> InjectedRuntimeStartHandler
+ {
+ get { return _runtimeStartHandler; }
+ set { _runtimeStartHandler = value; }
+ }
+
+ public IInjectionFuture<ISet<IObserver<RuntimeStop>>> InjectedRuntimeStopHandler
+ {
+ get { return _runtimeStopHandler; }
+ set { _runtimeStopHandler = value; }
+ }
+
+ /// <summary>
+ /// Schedule a TimerEvent at the given future offset
+ /// </summary>
+ /// <param name="offset">The offset in the future to schedule the alarm</param>
+ /// <param name="handler">The IObserver to to be called</param>
+ public override void ScheduleAlarm(long offset, IObserver<Alarm> handler)
+ {
+ if (_disposed)
+ {
+ return;
+ }
+ if (handler == null)
+ {
+ Exceptions.Throw(new ArgumentNullException("handler"), LOGGER);
+ }
+
+ lock (_schedule)
+ {
+ _schedule.Add(new ClientAlarm(_timer.CurrentTime + offset, handler));
+ Monitor.PulseAll(_schedule);
+ }
+ }
+
+ /// <summary>
+ /// Clock is idle if it has no future alarms set
+ /// </summary>
+ /// <returns>True if no future alarms are set, otherwise false</returns>
+ public override bool IsIdle()
+ {
+ lock (_schedule)
+ {
+ return _schedule.Count == 0;
+ }
+ }
+
+ /// <summary>
+ /// Dispose of the clock and all scheduled alarms
+ /// </summary>
+ public override void Dispose()
+ {
+ lock (_schedule)
+ {
+ _schedule.Clear();
+ _schedule.Add(new StopTime(_timer.CurrentTime));
+ Monitor.PulseAll(_schedule);
+ _disposed = true;
+ }
+ }
+
+ /// <summary>
+ /// Register the IObserver for the particular Time event.
+ /// </summary>
+ /// <param name="observer">The handler to register</param>
+ public void RegisterObserver<U>(IObserver<U> observer) where U : Time
+ {
+ if (_disposed)
+ {
+ return;
+ }
+
+ _handlers.Subscribe(observer);
+ }
+
+ /// <summary>
+ /// Start the RuntimeClock.
+ /// Clock will continue to run and handle events until it has been disposed.
+ /// </summary>
+ public void Run()
+ {
+ SubscribeHandlers();
+ _handlers.OnNext(new RuntimeStart(_timer.CurrentTime));
+ _handlers.OnNext(new StartTime(_timer.CurrentTime));
+
+ while (true)
+ {
+ lock (_schedule)
+ {
+ if (IsIdle())
+ {
+ _handlers.OnNext(new IdleClock(_timer.CurrentTime));
+ }
+
+ // Blocks and releases lock until it receives the next event
+ Time alarm = GetNextEvent();
+ ProcessEvent(alarm);
+
+ if (alarm is StopTime)
+ {
+ break;
+ }
+ }
+ }
+ _handlers.OnNext(new RuntimeStop(_timer.CurrentTime));
+ }
+
+ /// <summary>
+ /// Register the event handlers
+ /// </summary>
+ private void SubscribeHandlers()
+ {
+ Subscribe(_startHandler.Get());
+ Subscribe(_stopHandler.Get());
+ Subscribe(_runtimeStartHandler.Get());
+ Subscribe(_runtimeStopHandler.Get());
+ Subscribe(_idleHandler.Get());
+ }
+
+ /// <summary>
+ /// Subscribe a set of IObservers for a particular Time event
+ /// </summary>
+ /// <param name="observers">The set of observers to subscribe</param>
+ private void Subscribe<U>(ISet<IObserver<U>> observers) where U : Time
+ {
+ foreach (IObserver<U> observer in observers)
+ {
+ _handlers.Subscribe(observer);
+ }
+ }
+
+ /// <summary>
+ /// Wait until the first scheduled alarm is ready to be handled
+ /// Assumes that we have a lock on the _schedule SortedSet
+ /// </summary>
+ private Time GetNextEvent()
+ {
+ // Wait for an alarm to be scheduled on the condition variable Count
+ while (_schedule.Count == 0)
+ {
+ Monitor.Wait(_schedule);
+ }
+
+ // Once the alarm is scheduled, wait for the prescribed amount of time.
+ // If a new alarm is scheduled with a shorter duration, Wait will preempt
+ // and duration will update to reflect the new alarm's timestamp
+ for (long duration = _timer.GetDuration(_schedule.First().TimeStamp);
+ duration > 0;
+ duration = _timer.GetDuration(_schedule.First().TimeStamp))
+ {
+ Monitor.Wait(_schedule, TimeSpan.FromMilliseconds(duration));
+ }
+
+ Time time = _schedule.First();
+ _schedule.Remove(time);
+ return time;
+ }
+
+ /// <summary>
+ /// Process the next Time event.
+ /// </summary>
+ /// <param name="time">The Time event to handle</param>
+ private void ProcessEvent(Time time)
+ {
+ if (time is Alarm)
+ {
+ Alarm alarm = (Alarm) time;
+ alarm.Handle();
+ }
+ else
+ {
+ _handlers.OnNext(time);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Time/Time.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Time/Time.cs b/lang/cs/Org.Apache.REEF.Wake/Time/Time.cs
new file mode 100644
index 0000000..16d3d57
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Time/Time.cs
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Globalization;
+
+namespace Org.Apache.REEF.Wake.Time
+{
+ /// <summary>
+ /// Time object
+ /// </summary>
+ public abstract class Time : IComparable<Time>
+ {
+ public Time(long timeStamp)
+ {
+ TimeStamp = timeStamp;
+ }
+
+ public long TimeStamp { get; private set; }
+
+ public override string ToString()
+ {
+ return string.Format(CultureInfo.InvariantCulture, "{0}:[{1}]", GetType().Name, TimeStamp);
+ }
+
+ public override int GetHashCode()
+ {
+ return base.GetHashCode();
+ }
+
+ public override bool Equals(object obj)
+ {
+ if (this == obj)
+ {
+ return true;
+ }
+ Time other = obj as Time;
+ if (other != null)
+ {
+ return CompareTo(other) == 0;
+ }
+ return false;
+ }
+
+ public int CompareTo(Time other)
+ {
+ if (TimeStamp < other.TimeStamp)
+ {
+ return -1;
+ }
+ if (TimeStamp > other.TimeStamp)
+ {
+ return 1;
+ }
+ if (GetHashCode() < other.GetHashCode())
+ {
+ return -1;
+ }
+ if (GetHashCode() > other.GetHashCode())
+ {
+ return 1;
+ }
+ return 0;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Util/Actionable.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Util/Actionable.cs b/lang/cs/Org.Apache.REEF.Wake/Util/Actionable.cs
new file mode 100644
index 0000000..ff16af9
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Util/Actionable.cs
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Threading;
+
+namespace Org.Apache.REEF.Wake.Util
+{
+ public class Actionable
+ {
+ private readonly ThreadStart _threadStart;
+
+ public Actionable()
+ {
+ }
+
+ internal Actionable(ThreadStart threadStart)
+ {
+ _threadStart = threadStart;
+ }
+
+ public void Call()
+ {
+ _threadStart();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Util/Disposable.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Util/Disposable.cs b/lang/cs/Org.Apache.REEF.Wake/Util/Disposable.cs
new file mode 100644
index 0000000..c289eea
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Util/Disposable.cs
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Org.Apache.REEF.Wake.Util
+{
+ /// <summary>
+ /// Generates IDisposables from a factory method
+ /// </summary>
+ internal class Disposable : IDisposable
+ {
+ private Action _disposeFunction;
+ private bool _disposed;
+
+ private Disposable(Action disposeFunction)
+ {
+ _disposeFunction = disposeFunction;
+ _disposed = false;
+ }
+
+ /// <summary>
+ /// Factory method to create an IDisposable from a function.
+ /// </summary>
+ /// <param name="disposeFunction">The function to call when disposing</param>
+ /// <returns>An IDisposable from the given dispose function</returns>
+ public static IDisposable Create(Action disposeFunction)
+ {
+ return new Disposable(disposeFunction);
+ }
+
+ /// <summary>
+ /// Dispose of resources by calling the supplied dispose function
+ /// </summary>
+ public void Dispose()
+ {
+ if (!_disposed)
+ {
+ _disposeFunction();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Util/FixedThreadPoolTaskService.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Util/FixedThreadPoolTaskService.cs b/lang/cs/Org.Apache.REEF.Wake/Util/FixedThreadPoolTaskService.cs
new file mode 100644
index 0000000..49f9a11
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Util/FixedThreadPoolTaskService.cs
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Utilities.Diagnostics;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Wake.Time.Runtime.Event;
+
+namespace Org.Apache.REEF.Wake.Util
+{
+ public class FixedThreadPoolTaskService : ITaskService
+ {
+ private static readonly Logger LOGGER = Logger.GetLogger(typeof(FixedThreadPoolTaskService));
+
+ TaskFactory factory;
+
+ List<Task> tasks = new List<Task>();
+ bool shuttingDown;
+
+ internal FixedThreadPoolTaskService(int maxDegreeOfParallelism)
+ {
+ LimitedConcurrencyLevelTaskScheduler lcts = new LimitedConcurrencyLevelTaskScheduler(maxDegreeOfParallelism);
+ factory = new TaskFactory(lcts);
+ }
+
+ public bool AwaitTermination(long n, TimeSpan unit)
+ {
+ Task[] allTasks;
+ lock (tasks)
+ {
+ if (tasks.Count == 0)
+ {
+ return true;
+ }
+ allTasks = tasks.ToArray();
+ }
+ return Task.WaitAll(allTasks, unit);
+ }
+
+ public void ShutdownNow()
+ {
+ Shutdown();
+ }
+
+ public void Shutdown()
+ {
+ lock (tasks)
+ {
+ shuttingDown = true;
+ }
+ }
+
+ public Task<T> Submit<T>(Func<T> c)
+ {
+ Task<T> task = null;
+ lock (tasks)
+ {
+ if (shuttingDown)
+ {
+ Exceptions.Throw(new InvalidOperationException("Shutting down"), LOGGER);
+ }
+
+ CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
+ CancellationToken cancellationToken = cancellationTokenSource.Token;
+ task = factory.StartNew(c, cancellationToken);
+ tasks.Add(task);
+ }
+ return task;
+ }
+
+ public void Execute(ThreadStart threadStart)
+ {
+ new Actionable(threadStart).Call();
+ }
+
+ internal void RemoveTask(Task task)
+ {
+ lock (tasks)
+ {
+ tasks.Remove(task);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Util/IStartable.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Util/IStartable.cs b/lang/cs/Org.Apache.REEF.Wake/Util/IStartable.cs
new file mode 100644
index 0000000..81214a4
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Util/IStartable.cs
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Org.Apache.REEF.Wake.Util
+{
+ public interface IStartable
+ {
+ void Start();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Util/ITaskService.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Util/ITaskService.cs b/lang/cs/Org.Apache.REEF.Wake/Util/ITaskService.cs
new file mode 100644
index 0000000..ae9fabc
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Util/ITaskService.cs
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Org.Apache.REEF.Wake.Util
+{
+ public interface ITaskService
+ {
+ void Shutdown();
+
+ void Execute(ThreadStart threadStart);
+
+ Task<T> Submit<T>(Func<T> ob);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Util/LimitedConcurrencyLevelTaskScheduler.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Util/LimitedConcurrencyLevelTaskScheduler.cs b/lang/cs/Org.Apache.REEF.Wake/Util/LimitedConcurrencyLevelTaskScheduler.cs
new file mode 100644
index 0000000..f6fd482
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Util/LimitedConcurrencyLevelTaskScheduler.cs
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Utilities.Diagnostics;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Wake.Util
+{
+ internal class LimitedConcurrencyLevelTaskScheduler : TaskScheduler
+ {
+ private static readonly Logger LOGGER = Logger.GetLogger(typeof(LimitedConcurrencyLevelTaskScheduler));
+
+ /// <summary>Whether the current thread is processing work items.</summary>
+ [ThreadStatic]
+ private static bool _currentThreadIsProcessingItems;
+
+ /// <summary>The list of tasks to be executed.</summary>
+ private readonly LinkedList<Task> _tasks = new LinkedList<Task>(); // protected by lock(_tasks)
+
+ /// <summary>The maximum concurrency level allowed by this scheduler.</summary>
+ private readonly int _maxDegreeOfParallelism;
+
+ /// <summary>Whether the scheduler is currently processing work items.</summary>
+ private int _delegatesQueuedOrRunning = 0; // protected by lock(_tasks)
+
+ /// <summary>
+ /// Initializes an instance of the LimitedConcurrencyLevelTaskScheduler class with the
+ /// specified degree of parallelism.
+ /// </summary>
+ /// <param name="maxDegreeOfParallelism">The maximum degree of parallelism provided by this scheduler.</param>
+ public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism)
+ {
+ if (maxDegreeOfParallelism < 1)
+ {
+ Exceptions.Throw(new ArgumentOutOfRangeException("maxDegreeOfParallelism"), LOGGER);
+ }
+ _maxDegreeOfParallelism = maxDegreeOfParallelism;
+ }
+
+ /// <summary>Gets the maximum concurrency level supported by this scheduler.</summary>
+ public sealed override int MaximumConcurrencyLevel
+ {
+ get
+ {
+ return _maxDegreeOfParallelism;
+ }
+ }
+
+ /// <summary>Queues a task to the scheduler.</summary>
+ /// <param name="task">The task to be queued.</param>
+ protected sealed override void QueueTask(Task task)
+ {
+ // Add the task to the list of tasks to be processed. If there aren't enough
+ // delegates currently queued or running to process tasks, schedule another.
+ lock (_tasks)
+ {
+ _tasks.AddLast(task);
+ if (_delegatesQueuedOrRunning < _maxDegreeOfParallelism)
+ {
+ ++_delegatesQueuedOrRunning;
+ NotifyThreadPoolOfPendingWork();
+ }
+ }
+ }
+
+ /// <summary>Attempts to execute the specified task on the current thread.</summary>
+ /// <param name="task">The task to be executed.</param>
+ /// <param name="taskWasPreviouslyQueued"></param>
+ /// <returns>Whether the task could be executed on the current thread.</returns>
+ protected sealed override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
+ {
+ // If this thread isn't already processing a task, we don't support inlining
+ if (!_currentThreadIsProcessingItems)
+ {
+ return false;
+ }
+
+ // If the task was previously queued, remove it from the queue
+ if (taskWasPreviouslyQueued)
+ {
+ TryDequeue(task);
+ }
+
+ // Try to run the task.
+ return TryExecuteTask(task);
+ }
+
+ /// <summary>Attempts to remove a previously scheduled task from the scheduler.</summary>
+ /// <param name="task">The task to be removed.</param>
+ /// <returns>Whether the task could be found and removed.</returns>
+ protected sealed override bool TryDequeue(Task task)
+ {
+ lock (_tasks)
+ {
+ return _tasks.Remove(task);
+ }
+ }
+
+ /// <summary>Gets an enumerable of the tasks currently scheduled on this scheduler.</summary>
+ /// <returns>An enumerable of the tasks currently scheduled.</returns>
+ protected sealed override IEnumerable<Task> GetScheduledTasks()
+ {
+ bool lockTaken = false;
+ try
+ {
+ Monitor.TryEnter(_tasks, ref lockTaken);
+ if (lockTaken)
+ {
+ return _tasks.ToArray();
+ }
+ else
+ {
+ throw new NotSupportedException();
+ }
+ }
+ finally
+ {
+ if (lockTaken)
+ {
+ Monitor.Exit(_tasks);
+ }
+ }
+ }
+
+ /// <summary>
+ /// Informs the ThreadPool that there's work to be executed for this scheduler.
+ /// </summary>
+ private void NotifyThreadPoolOfPendingWork()
+ {
+ ThreadPool.UnsafeQueueUserWorkItem(_ =>
+ {
+ // Note that the current thread is now processing work items.
+ // This is necessary to enable inlining of tasks into this thread.
+ _currentThreadIsProcessingItems = true;
+ try
+ {
+ // Process all available items in the queue.
+ while (true)
+ {
+ Task item;
+ lock (_tasks)
+ {
+ // When there are no more items to be processed,
+ // note that we're done processing, and get out.
+ if (_tasks.Count == 0)
+ {
+ --_delegatesQueuedOrRunning;
+ break;
+ }
+ // Get the next item from the queue
+ item = _tasks.First.Value;
+ _tasks.RemoveFirst();
+ }
+ // Execute the task we pulled out of the queue
+ base.TryExecuteTask(item);
+ }
+ }
+ // We're done processing items on the current thread
+ finally
+ {
+ _currentThreadIsProcessingItems = false;
+ }
+ }, null);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Util/NetworkUtils.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Util/NetworkUtils.cs b/lang/cs/Org.Apache.REEF.Wake/Util/NetworkUtils.cs
new file mode 100644
index 0000000..bd35952
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Util/NetworkUtils.cs
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Net;
+using System.Net.Sockets;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Org.Apache.REEF.Wake.Util
+{
+ public class NetworkUtils
+ {
+ private static IPAddress _localAddress;
+ private static Random _random = new Random();
+
+ /// <summary>
+ /// Returns the first usable IP Address for the machine.
+ /// </summary>
+ /// <returns>The machine's local IP Address</returns>
+ public static IPAddress LocalIPAddress
+ {
+ get
+ {
+ if (_localAddress == null)
+ {
+ IPAddress[] localIps = Dns.GetHostAddresses(Dns.GetHostName());
+ _localAddress = localIps.Where(i => i.AddressFamily.Equals(AddressFamily.InterNetwork))
+ .OrderBy(ip => ip.ToString())
+ .First();
+ }
+
+ return _localAddress;
+ }
+ }
+
+ /// <summary>
+ /// Generate a random port between low (inclusive) and high (exclusive)
+ /// </summary>
+ /// <param name="low">The inclusive lower bound of the of the port range</param>
+ /// <param name="high">The exclusive upper bound of the port range</param>
+ /// <returns>The randomly generated port</returns>
+ public static int GenerateRandomPort(int low, int high)
+ {
+ return _random.Next(low, high);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Util/SerializationHelper.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Util/SerializationHelper.cs b/lang/cs/Org.Apache.REEF.Wake/Util/SerializationHelper.cs
new file mode 100644
index 0000000..1c29382
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Util/SerializationHelper.cs
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using ProtoBuf;
+using System;
+using System.IO;
+using System.Runtime.Serialization;
+
+namespace Org.Apache.REEF.Wake.Util
+{
+ public class SerializationHelper
+ {
+ public static byte[] Serialize<T>(T t)
+ {
+ using (var s = new MemoryStream())
+ {
+ Serializer.Serialize(s, t);
+ return s.ToArray();
+ }
+ }
+
+ public static T Deserialize<T>(byte[] bytes)
+ {
+ using (var s = new MemoryStream(bytes))
+ {
+ return Serializer.Deserialize<T>(s);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Util/TaskExtensions.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Util/TaskExtensions.cs b/lang/cs/Org.Apache.REEF.Wake/Util/TaskExtensions.cs
new file mode 100644
index 0000000..69e4972
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Util/TaskExtensions.cs
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Org.Apache.REEF.Wake.Util
+{
+ public static class TaskExtensions
+ {
+ public static void Forget(this Task task)
+ {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/Util/TimeHelper.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Util/TimeHelper.cs b/lang/cs/Org.Apache.REEF.Wake/Util/TimeHelper.cs
new file mode 100644
index 0000000..2f913cc
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Util/TimeHelper.cs
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Threading;
+
+namespace Org.Apache.REEF.Wake.Util
+{
+ public class TimeHelper
+ {
+ public const long TicksPerMilliSecond = 10000;
+ public const long TicksPerMicroSecond = 10;
+ public const double TicksPerNanoSecond = .01;
+
+ public static long CurrentTimeToNanoSeconds
+ {
+ get
+ {
+ return DateTime.Now.Ticks / 100;
+ }
+ }
+
+ public static long AsLongNanoSeconds(TimeSpan timeSpan)
+ {
+ return (long)(timeSpan.Ticks * TicksPerNanoSecond);
+ }
+
+ public static double AsDoubleNanoSeconds(TimeSpan timeSpan)
+ {
+ return timeSpan.Ticks * TicksPerNanoSecond;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Wake/WakeRuntimeException.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/WakeRuntimeException.cs b/lang/cs/Org.Apache.REEF.Wake/WakeRuntimeException.cs
new file mode 100644
index 0000000..2e21a60
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/WakeRuntimeException.cs
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+
+namespace Org.Apache.REEF.Wake
+{
+ /// <summary>Wake runtime exception</summary>
+ [System.Serializable]
+ public class WakeRuntimeException : Exception
+ {
+ private const long serialVersionUID = 1L;
+
+ /// <summary>Constructs a new runtime wake exception with the specified detail message and cause
+ /// </summary>
+ /// <param name="s">the detailed message</param>
+ /// <param name="e">the cause</param>
+ public WakeRuntimeException(string s, Exception e)
+ : base(s, e)
+ {
+ }
+
+ /// <summary>Constructs a new runtime stage exception with the specified detail message
+ /// </summary>
+ /// <param name="s">the detailed message</param>
+ public WakeRuntimeException(string s)
+ : base(s)
+ {
+ }
+
+ /// <summary>Constructs a new runtime stage exception with the specified cause</summary>
+ /// <param name="e">the cause</param>
+ public WakeRuntimeException(Exception e)
+ : base("Runtime Exception", e)
+ {
+ }
+ }
+}