You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/01/29 21:42:55 UTC
[11/31] incubator-reef git commit: [REEF-97] Add the REEF.NET code
base
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/Utilities/Diagnostics/Exceptions.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/Utilities/Diagnostics/Exceptions.cs b/lang/cs/Source/Utilities/Diagnostics/Exceptions.cs
new file mode 100644
index 0000000..1308144
--- /dev/null
+++ b/lang/cs/Source/Utilities/Diagnostics/Exceptions.cs
@@ -0,0 +1,282 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Globalization;
+using System.Text;
+using Org.Apache.Reef.Utilities.Logging;
+
+namespace Org.Apache.Reef.Utilities.Diagnostics
+{
+ public static class Exceptions
+ {
+ #region methods
+ /// <summary>
+ /// Call this method to throw an exception.
+ /// </summary>
+ /// <remarks>
+ /// Calling this method will trace the exception and do other common processing,
+ /// and then it will throw the exception. This method traces the exception type
+ /// and message at error level and the full stack trace at all other levels.
+ /// </remarks>
+ /// <example>
+ /// Exceptions.Throw(new Exception("Some exception"));
+ /// </example>
+ /// <param name="exception">The exception to be thrown.</param>
+ /// <param name="message">The message from the caller class.</param>
+ /// <param name="logger">The logger from the caller class.</param>
+ public static void Throw(Exception exception, string message, Logger logger)
+ {
+ string logMessage = string.Concat(DiagnosticsMessages.ExceptionThrowing, " ", exception.GetType().Name, " ", message);
+ if (logger == null)
+ {
+ Console.WriteLine("Exception caught before logger is initiated, error message: " + logMessage + exception.Message);
+ }
+ else
+ {
+ logger.Log(Level.Error, logMessage, exception);
+ }
+ throw exception;
+ }
+
+ /// <summary>
+ /// Call this method to throw an exception.
+ /// </summary>
+ /// <remarks>
+ /// Calling this method will trace the exception and do other common processing,
+ /// and then it will throw the exception. This method traces the exception type
+ /// and message at error level and the full stack trace at all other levels.
+ /// </remarks>
+ /// <example>
+ /// Exceptions.Throw(new Exception("Some exception"));
+ /// </example>
+ /// <param name="exception">The exception to be thrown.</param>
+ /// <param name="logger">The logger of the caller class.</param>
+ public static void Throw(Exception exception, Logger logger)
+ {
+ Throw(exception, string.Empty, logger);
+ }
+
+ /// <summary>
+ /// Call this method every time when an exception is caught.
+ /// </summary>
+ /// <remarks>
+ /// Calling this method will trace the exception and do other common processing.
+ /// This method traces the exception type and message at error level and the full
+ /// stack trace at all other levels.
+ /// </remarks>
+ /// <example>
+ /// try
+ /// {
+ /// // Some code that can throw
+ /// }
+ /// catch (Exception e)
+ /// {
+ /// Exceptions.Caught(e);
+ /// // Exception handling code
+ /// }
+ /// </example>
+ /// <param name="exception">The exception being caught.</param>
+ /// <param name="level">The log level.</param>
+ /// <param name="logger">The logger from the caller class.</param>
+ public static void Caught(Exception exception, Level level, Logger logger)
+ {
+ Caught(exception, level, string.Empty, logger);
+ }
+
+ /// <summary>
+ /// Call this method every time when an exception is caught.
+ /// </summary>
+ /// <remarks>
+ /// Calling this method will trace the exception and do other common processing.
+ /// This method traces the exception type and message at error level and the full
+ /// stack trace at all other levels.
+ /// </remarks>
+ /// <example>
+ /// try
+ /// {
+ /// // Some code that can throw
+ /// }
+ /// catch (Exception e)
+ /// {
+ /// Exceptions.Caught(e);
+ /// // Exception handling code
+ /// }
+ /// </example>
+ /// <param name="exception">The exception being caught.</param>
+ /// <param name="level">The log level.</param>
+ /// <param name="message">The additional messag to log.</param>
+ /// <param name="logger">The Logger from the caller class.</param>
+ public static void Caught(Exception exception, Level level, string message, Logger logger)
+ {
+ string logMessage = string.Concat(DiagnosticsMessages.ExceptionCaught, " ", exception.GetType().Name, " ", message);
+ if (logger == null)
+ {
+ Console.WriteLine("Exception caught before logger is initiated, error message: " + logMessage + exception.Message);
+ }
+ else
+ {
+ logger.Log(level, logMessage, exception);
+ }
+ }
+
+ public static void CaughtAndThrow(Exception exception, Level level, Logger logger)
+ {
+ CaughtAndThrow(exception, level, string.Empty, logger);
+ }
+
+ public static void CaughtAndThrow(Exception exception, Level level, string message, Logger logger)
+ {
+ string logMessage = string.Concat(DiagnosticsMessages.ExceptionCaught, " ", exception.GetType().Name, " ", message);
+ if (logger == null)
+ {
+ Console.WriteLine("Exception caught before logger is initiated, error message: " + logMessage + exception.Message);
+ }
+ else
+ {
+ logger.Log(level, logMessage, exception);
+ }
+ throw exception;
+ }
+
+ /// <summary>
+ /// This method returns true if the exception passed as parameter is a critical exception
+ /// that should have not been caught. Examples for such exceptions are StackOverflowException
+ /// and OutOfMemoryException.
+ /// </summary>
+ /// <remarks>
+ /// Catch statements which catch all exceptions must call this method immediately and rethrow
+ /// wihtout further processing if the method returns true.
+ /// </remarks>
+ /// <example>
+ /// try
+ /// {
+ /// // Some code that can throw
+ /// }
+ /// catch (Exception e)
+ /// {
+ /// if (Exceptions.MustRethrow(e))
+ /// {
+ /// throw;
+ /// }
+ /// // Exception handling code
+ /// }
+ /// </example>
+ /// <param name="exception">The exception to be checked.</param>
+ /// <returns>True if the exceptions is critical one and should not be caught and false otherwise.</returns>
+ public static bool MustRethrow(Exception exception)
+ {
+ return (exception is OutOfMemoryException ||
+ exception is StackOverflowException);
+ }
+
+ /// <summary>
+ /// Gets an exception message that includes the messages of the inner exceptions..
+ /// </summary>
+ /// <param name="e">The excption.</param>
+ /// <returns>The meessage</returns>
+ public static string GetFullMessage(Exception e)
+ {
+ var fullMessage = new StringBuilder();
+ bool firstLevel = true;
+ while (e != null)
+ {
+ if (firstLevel)
+ {
+ firstLevel = false;
+ }
+ else
+ {
+ fullMessage.Append("-->");
+ }
+ fullMessage.Append(e.Message);
+ e = e.InnerException;
+ }
+
+ return fullMessage.ToString();
+ }
+
+ /// <summary>
+ /// Call this method to throw ArgumentException for an invalid argument.
+ /// </summary>
+ /// <param name="argumentName">The invalid argument name.</param>
+ /// <param name="message">A message explaining the reason for th exception.</param>
+ /// <param name="logger">The logger of the caller class.</param>
+ public static void ThrowInvalidArgument(string argumentName, string message, Logger logger)
+ {
+ Throw(new ArgumentException(message, argumentName), logger);
+ }
+
+ /// <summary>
+ /// Call this method to throw ArgumentOutOfRangeException exception.
+ /// </summary>
+ /// <param name="argumentName">The invalid argument name.</param>
+ /// <param name="message">A message explaining the reason for th exception.</param>
+ /// <param name="logger">The logger of the caller class.</param>
+ public static void ThrowArgumentOutOfRange(string argumentName, string message, Logger logger)
+ {
+ Throw(new ArgumentOutOfRangeException(argumentName, message), logger);
+ }
+
+ /// <summary>
+ /// Call this method to check if an argument is null and throw ArgumentNullException exception.
+ /// </summary>
+ /// <param name="argument">The argument to be checked.</param>
+ /// <param name="name">The name of the argument.</param>
+ /// <param name="logger">The logger of the caller class.</param>
+ public static void ThrowIfArgumentNull(object argument, string name, Logger logger)
+ {
+ if (argument == null)
+ {
+ Exceptions.Throw(new ArgumentNullException(name), logger);
+ }
+ }
+
+ /// <summary>
+ /// Call this method to throw ObjectDisposedException if an object is disposed.
+ /// </summary>
+ /// <remarks>
+ /// All disposable objects should check their state and throw in the beginning of each public method.
+ /// This helper method provides a shorter way to do this.
+ /// </remarks>
+ /// <example>
+ /// class SomeClass : IDisposable
+ /// {
+ /// bool _disposed;
+ /// // ...
+ /// public void SomePublicMethod()
+ /// {
+ /// Exceptions.ThrowIfObjectDisposed(_disposed, this);
+ /// // Method's code
+ /// }
+ /// }
+ /// </example>
+ /// <param name="disposed">True if the object is disposed.</param>
+ /// <param name="o">The object.</param>
+ /// <param name="logger">The logger of the caller class.</param>
+ public static void ThrowIfObjectDisposed(bool disposed, object o, Logger logger)
+ {
+ if (disposed)
+ {
+ Throw(new ObjectDisposedException(o.GetType().Name), logger);
+ }
+ }
+ #endregion
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/Utilities/IIdentifiable.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/Utilities/IIdentifiable.cs b/lang/cs/Source/Utilities/IIdentifiable.cs
new file mode 100644
index 0000000..798f53c
--- /dev/null
+++ b/lang/cs/Source/Utilities/IIdentifiable.cs
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.Reef.Utilities
+{
+ public interface IIdentifiable
+ {
+ string Id { get; set; }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/Utilities/IMessage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/Utilities/IMessage.cs b/lang/cs/Source/Utilities/IMessage.cs
new file mode 100644
index 0000000..35ed55d
--- /dev/null
+++ b/lang/cs/Source/Utilities/IMessage.cs
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.Reef.Utilities
+{
+ /// <summary>
+ /// A message from a REEF component
+ /// </summary>
+ public interface IMessage
+ {
+ /// <summary>
+ /// Get Message payload
+ /// </summary>
+ /// <returns></returns>
+ byte[] Message { get; }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/Utilities/Logging/JavaLoggingSetting.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/Utilities/Logging/JavaLoggingSetting.cs b/lang/cs/Source/Utilities/Logging/JavaLoggingSetting.cs
new file mode 100644
index 0000000..cabf1a7
--- /dev/null
+++ b/lang/cs/Source/Utilities/Logging/JavaLoggingSetting.cs
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.Reef.Utilities.Logging
+{
+ public enum JavaLoggingSetting
+ {
+ /// <summary>
+ /// info level log, and not transferred to CLR
+ /// </summary>
+ INFO = 0,
+
+ /// <summary>
+ /// verbose log, but not to CLR
+ /// </summary>
+ VERBOSE = 1,
+
+ /// <summary>
+ /// verbose log, transferred to CLR
+ /// </summary>
+ VERBOSE_TO_CLR = 2,
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/Utilities/Logging/Level.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/Utilities/Logging/Level.cs b/lang/cs/Source/Utilities/Logging/Level.cs
new file mode 100644
index 0000000..71dd62f
--- /dev/null
+++ b/lang/cs/Source/Utilities/Logging/Level.cs
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.Reef.Utilities.Logging
+{
+ public enum Level
+ {
+ /// <summary>
+ /// Output no tracing and debugging messages.
+ /// </summary>
+ Off = 0,
+
+ /// <summary>
+ /// Output error-handling messages.
+ /// </summary>
+ Error = 1,
+
+ /// <summary>
+ /// Output warnings and error-handling messages.
+ /// </summary>
+ Warning = 2,
+
+ /// <summary>
+ /// Trace a start event
+ /// </summary>
+ Start = 3,
+
+ /// <summary>
+ /// Trace a stop event
+ /// </summary>
+ Stop = 4,
+
+ /// <summary>
+ /// Output informational messages, warnings, and error-handling messages.
+ /// </summary>
+ Info = 5,
+
+ /// <summary>
+ /// Output all debugging and tracing messages.
+ /// </summary>
+ Verbose = 6,
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/Utilities/Logging/Logger.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/Utilities/Logging/Logger.cs b/lang/cs/Source/Utilities/Logging/Logger.cs
new file mode 100644
index 0000000..99ea80f
--- /dev/null
+++ b/lang/cs/Source/Utilities/Logging/Logger.cs
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Globalization;
+
+namespace Org.Apache.Reef.Utilities.Logging
+{
+ public class Logger
+ {
+ private static readonly string[] LogLevel = new string[]
+ {
+ "OFF",
+ "ERROR",
+ "WARNING",
+ "START",
+ "EXIT",
+ "INFO",
+ "VERBOSE"
+ };
+
+ private static readonly Dictionary<Level, TraceEventType> EventTypes
+ = new Dictionary<Level, TraceEventType>()
+ {
+ { Level.Off, TraceEventType.Stop },
+ { Level.Error, TraceEventType.Error },
+ { Level.Warning, TraceEventType.Warning },
+ { Level.Start, TraceEventType.Start },
+ { Level.Stop, TraceEventType.Stop },
+ { Level.Info, TraceEventType.Information },
+ { Level.Verbose, TraceEventType.Verbose },
+ };
+
+ private static Level _customLevel = Level.Verbose;
+
+ private static List<TraceListener> _traceListeners;
+
+ private string _name;
+
+ private TraceSource _traceSource;
+
+ private Logger(string name)
+ {
+ _name = name;
+ _traceSource = new TraceSource(_name, SourceLevels.All);
+ CustcomLevel = _customLevel;
+ if (TraceListeners.Count == 0)
+ {
+ // before customized listener is added, we would need to log to console
+ _traceSource.Listeners.Add(new ConsoleTraceListener());
+ }
+ else
+ {
+ _traceSource.Listeners.Clear();
+ foreach (TraceListener listener in TraceListeners)
+ {
+ _traceSource.Listeners.Add(listener);
+ }
+ }
+ }
+
+ public static Level CustcomLevel
+ {
+ get
+ {
+ return _customLevel;
+ }
+
+ set
+ {
+ _customLevel = value;
+ }
+ }
+
+ public static List<TraceListener> TraceListeners
+ {
+ get
+ {
+ if (_traceListeners == null)
+ {
+ _traceListeners = new List<TraceListener>();
+ }
+ return _traceListeners;
+ }
+ }
+
+ public static void SetCustomLevel(Level customLevel)
+ {
+ _customLevel = customLevel;
+ }
+
+ public static void AddTraceListner(TraceListener listener)
+ {
+ TraceListeners.Add(listener);
+ }
+
+ public static Logger GetLogger(Type type)
+ {
+ return GetLogger(type.FullName);
+ }
+
+ public static Logger GetLogger(string name)
+ {
+ return new Logger(name);
+ }
+
+ /// <summary>
+ /// Log the message with the specified Log Level.
+ ///
+ /// If addtional arguments are passed, the message will be treated as
+ /// a format string. The format string and the additional arguments
+ /// will be formatted according to string.Format()
+ /// </summary>
+ /// <param name="level"></param>
+ /// <param name="formatStr"></param>
+ /// <param name="args"></param>
+ public void Log(Level level, string formatStr, params object[] args)
+ {
+ if (CustcomLevel >= level)
+ {
+ string msg = FormatMessage(formatStr, args);
+ string logMessage =
+ DateTime.Now.ToString("o", CultureInfo.InvariantCulture)
+ + " "
+ + System.Threading.Thread.CurrentThread.ManagedThreadId.ToString("D4", CultureInfo.InvariantCulture)
+ + Environment.NewLine + LogLevel[(int)level] + ": "
+ + msg;
+
+ _traceSource.TraceEvent(
+ EventTypes[level],
+ 0, // we don't use event id for now, but this can be useful for e2e logging later
+ logMessage);
+ }
+ }
+
+ public void Log(Level level, string msg, Exception exception)
+ {
+ string exceptionLog = string.Empty;
+ if (exception != null)
+ {
+ exceptionLog = string.Format(
+ CultureInfo.InvariantCulture,
+ "encountered error [{0}] with mesage [{1}] and stack trace [{2}]",
+ exception,
+ exception.Message,
+ exception.StackTrace);
+ }
+ Log(level, msg + exceptionLog);
+ }
+
+ public IDisposable LogFunction(string function, params object[] args)
+ {
+ return LogScope(function, args);
+ }
+
+ public IDisposable LogScope(string format, params object[] args)
+ {
+ return new LoggingScope(this, DateTime.Now + " " + format, args);
+ }
+
+ private string FormatMessage(string formatStr, params object[] args)
+ {
+ return args.Length > 0 ? string.Format(CultureInfo.CurrentCulture, formatStr, args) : formatStr;
+ }
+
+ /// <summary>
+ /// Represents a logging scope.
+ /// </summary>
+ /// <remarks>
+ /// A start log is written when an instance is created
+ /// and a stop trace is written when the instance is disposed.
+ /// </remarks>
+ private sealed class LoggingScope : IDisposable
+ {
+ private readonly Stopwatch _stopWatch;
+
+ private readonly Logger _logger;
+
+ private readonly string _content;
+
+ /// <summary>
+ /// Initializes a new instance of the LoggingScope class.
+ /// </summary>
+ /// <param name="logger"></param>
+ /// <param name="format"></param>
+ /// <param name="args"></param>
+ public LoggingScope(Logger logger, string format, params object[] args)
+ {
+ _logger = logger;
+
+ _stopWatch = Stopwatch.StartNew();
+
+ string content = args.Length > 0 ? string.Format(CultureInfo.InvariantCulture, format, args) : format;
+ _content = content;
+
+ _logger.Log(Level.Start, content);
+ }
+
+ /// <summary>
+ /// Logs the end of a scope.
+ /// </summary>
+ public void Dispose()
+ {
+ _logger.Log(Level.Stop, string.Format(CultureInfo.InvariantCulture, "{0}. Duration: [{1}].", _content, _stopWatch.Elapsed));
+ _stopWatch.Stop();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/Utilities/NetUtilities.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/Utilities/NetUtilities.cs b/lang/cs/Source/Utilities/NetUtilities.cs
new file mode 100644
index 0000000..5a56645
--- /dev/null
+++ b/lang/cs/Source/Utilities/NetUtilities.cs
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Utilities.Diagnostics;
+using Org.Apache.Reef.Utilities.Logging;
+using System;
+using System.Net;
+
+namespace Org.Apache.Reef.Utilities
+{
+ public class NetUtilities
+ {
+ private static readonly Logger LOGGER = Logger.GetLogger(typeof(NetUtilities));
+
+ public static IPEndPoint ParseIpEndpoint(string ipWithPort)
+ {
+ string ip = ipWithPort.TrimStart().TrimEnd();
+ if (char.IsDigit(ip[0]))
+ {
+ ip = @"socket://" + ip;
+ }
+ Uri uri = new Uri(ip);
+ string driverAddress = uri.Host;
+ int driverCommunicationPort = uri.Port;
+ IPAddress ipAddress;
+ IPAddress.TryParse(driverAddress, out ipAddress);
+ if (ipAddress == null)
+ {
+ Exceptions.Throw(new FormatException("invalid format for ip: " + ipWithPort), LOGGER);
+ }
+
+ return new IPEndPoint(ipAddress, driverCommunicationPort);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/Utilities/Optional.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/Utilities/Optional.cs b/lang/cs/Source/Utilities/Optional.cs
new file mode 100644
index 0000000..2d85ff4
--- /dev/null
+++ b/lang/cs/Source/Utilities/Optional.cs
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using Org.Apache.Reef.Utilities.Logging;
+
+namespace Org.Apache.Reef.Utilities
+{
+ [Serializable]
+ public sealed class Optional<T>
+ {
+ private readonly T _value;
+
+ private Optional(T value)
+ {
+ _value = value;
+ }
+
+ private Optional()
+ {
+ _value = default(T);
+ }
+
+ public T Value
+ {
+ get { return _value; }
+ }
+
+ public static Optional<T> Of(T value)
+ {
+ if (value == null)
+ {
+ Diagnostics.Exceptions.Throw(new ArgumentNullException("value", "Passed a null value. Use OfNullable() instead"), Logger.GetLogger(typeof(Optional<T>)));
+ }
+ return new Optional<T>(value);
+ }
+
+ public static Optional<T> Empty()
+ {
+ return new Optional<T>();
+ }
+
+ public static Optional<T> OfNullable(T value)
+ {
+ if (value == null)
+ {
+ return Empty();
+ }
+ else
+ {
+ return Of(value);
+ }
+ }
+
+ public T OrElse(T other)
+ {
+ if (IsPresent())
+ {
+ return Value;
+ }
+ else
+ {
+ return other;
+ }
+ }
+
+ public bool IsPresent()
+ {
+ return (_value != null);
+ }
+
+ public override bool Equals(object obj)
+ {
+ if (this == obj)
+ {
+ return true;
+ }
+ if (obj == null || obj.GetType() != this.GetType())
+ {
+ return false;
+ }
+ Optional<T> optional = (Optional<T>)obj;
+ if (_value != null ? !_value.Equals(optional.Value) : optional.Value != null)
+ {
+ return false;
+ }
+ return true;
+ }
+
+ public override int GetHashCode()
+ {
+ return _value != null ? _value.GetHashCode() : 0;
+ }
+
+ public override string ToString()
+ {
+ return "Optional{value=" + _value + "}";
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/Utilities/Properties/AssemblyInfo.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/Utilities/Properties/AssemblyInfo.cs b/lang/cs/Source/Utilities/Properties/AssemblyInfo.cs
new file mode 100644
index 0000000..50e52c5
--- /dev/null
+++ b/lang/cs/Source/Utilities/Properties/AssemblyInfo.cs
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Reflection;
+using System.Runtime.CompilerServices;
+using System.Runtime.InteropServices;
+
+// General Information about an assembly is controlled through the following
+// set of attributes. Change these attribute values to modify the information
+// associated with an assembly.
+[assembly: AssemblyTitle("Utilities")]
+[assembly: AssemblyDescription("")]
+[assembly: AssemblyConfiguration("")]
+[assembly: AssemblyCompany("")]
+[assembly: AssemblyProduct("Utilities")]
+[assembly: AssemblyCopyright("Copyright © 2015")]
+[assembly: AssemblyTrademark("")]
+[assembly: AssemblyCulture("")]
+
+// Setting ComVisible to false makes the types in this assembly not visible
+// to COM components. If you need to access a type in this assembly from
+// COM, set the ComVisible attribute to true on that type.
+[assembly: ComVisible(false)]
+
+// The following GUID is for the ID of the typelib if this project is exposed to COM
+[assembly: Guid("a7bda51a-552a-4fba-a834-f715c19454ab")]
+
+// Version information for an assembly consists of the following four values:
+//
+// Major Version
+// Minor Version
+// Build Number
+// Revision
+//
+// You can specify all the values or you can default the Build and Revision Numbers
+// by using the '*' as shown below:
+// [assembly: AssemblyVersion("1.0.*")]
+[assembly: AssemblyVersion("1.0.0.0")]
+[assembly: AssemblyFileVersion("1.0.0.0")]
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/Utilities/Utilities.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Source/Utilities/Utilities.csproj b/lang/cs/Source/Utilities/Utilities.csproj
new file mode 100644
index 0000000..f3fd129
--- /dev/null
+++ b/lang/cs/Source/Utilities/Utilities.csproj
@@ -0,0 +1,97 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<Project ToolsVersion="12.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <Import Project="$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props" Condition="Exists('$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props')" />
+ <PropertyGroup>
+ <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
+ <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
+ <ProjectGuid>{79E7F89A-1DFB-45E1-8D43-D71A954AEB98}</ProjectGuid>
+ <OutputType>Library</OutputType>
+ <AppDesignerFolder>Properties</AppDesignerFolder>
+ <RootNamespace>Org.Apache.Reef.Utilities</RootNamespace>
+ <AssemblyName>Org.Apache.Reef.Utilities</AssemblyName>
+ <TargetFrameworkVersion>v4.5</TargetFrameworkVersion>
+ <FileAlignment>512</FileAlignment>
+ <SolutionDir Condition="$(SolutionDir) == '' Or $(SolutionDir) == '*Undefined*'">..\</SolutionDir>
+ <RestorePackages>true</RestorePackages>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
+ <PlatformTarget>AnyCPU</PlatformTarget>
+ <DebugSymbols>true</DebugSymbols>
+ <DebugType>full</DebugType>
+ <Optimize>false</Optimize>
+ <OutputPath>..\..\bin\Debug\Org.Apache.Reef.Utilities\</OutputPath>
+ <DefineConstants>DEBUG;TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ <PlatformTarget>AnyCPU</PlatformTarget>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
+ <PlatformTarget>AnyCPU</PlatformTarget>
+ <DebugType>pdbonly</DebugType>
+ <Optimize>true</Optimize>
+ <OutputPath>..\..\bin\Release\Microsoft.Reef.Utilities\</OutputPath>
+ <DefineConstants>TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <ItemGroup>
+ <Reference Include="Microsoft.Hadoop.Avro">
+ <HintPath>..\..\packages\Microsoft.Hadoop.Avro.1.4.0.0\lib\net40\Microsoft.Hadoop.Avro.dll</HintPath>
+ </Reference>
+ <Reference Include="Newtonsoft.Json">
+ <HintPath>..\..\packages\Newtonsoft.Json.6.0.8\lib\net45\Newtonsoft.Json.dll</HintPath>
+ </Reference>
+ <Reference Include="System" />
+ <Reference Include="System.Core" />
+ <Reference Include="System.Xml.Linq" />
+ <Reference Include="System.Data.DataSetExtensions" />
+ <Reference Include="Microsoft.CSharp" />
+ <Reference Include="System.Data" />
+ <Reference Include="System.Xml" />
+ </ItemGroup>
+ <ItemGroup>
+ <Compile Include="AvroUtils.cs" />
+ <Compile Include="ByteUtilities.cs" />
+ <Compile Include="Diagnostics\DiagnosticsMessages.cs" />
+ <Compile Include="Diagnostics\Exceptions.cs" />
+ <Compile Include="IIdentifiable.cs" />
+ <Compile Include="IMessage.cs" />
+ <Compile Include="Logging\JavaLoggingSetting.cs" />
+ <Compile Include="Logging\Level.cs" />
+ <Compile Include="Logging\Logger.cs" />
+ <Compile Include="NetUtilities.cs" />
+ <Compile Include="Optional.cs" />
+ <Compile Include="Properties\AssemblyInfo.cs" />
+ <Compile Include="ValidationUtilities.cs" />
+ </ItemGroup>
+ <ItemGroup>
+ <None Include="packages.config" />
+ </ItemGroup>
+ <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
+ <Import Project="$(SolutionDir)\.nuget\NuGet.targets" Condition="Exists('$(SolutionDir)\.nuget\NuGet.targets')" />
+ <!-- To modify your build process, add your task inside one of the targets below and uncomment it.
+ Other similar extension points exist, see Microsoft.Common.targets.
+ <Target Name="BeforeBuild">
+ </Target>
+ <Target Name="AfterBuild">
+ </Target>
+ -->
+</Project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/Utilities/ValidationUtilities.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/Utilities/ValidationUtilities.cs b/lang/cs/Source/Utilities/ValidationUtilities.cs
new file mode 100644
index 0000000..a452f1f
--- /dev/null
+++ b/lang/cs/Source/Utilities/ValidationUtilities.cs
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Globalization;
+using Org.Apache.Reef.Utilities.Logging;
+
+namespace Org.Apache.Reef.Utilities
+{
+ public class ValidationUtilities
+ {
+ private static readonly Logger LOGGER = Logger.GetLogger(typeof(ValidationUtilities));
+
+ public static string ValidateEnvVariable(string env)
+ {
+ string envVariable = Environment.GetEnvironmentVariable(env);
+ if (string.IsNullOrWhiteSpace(envVariable))
+ {
+ Console.WriteLine(string.Format(CultureInfo.InvariantCulture, "{0} not set. Please set the environment variable first. Exiting...", env));
+ string msg = string.Format(CultureInfo.InvariantCulture, "No {0} found.", env);
+ Diagnostics.Exceptions.Throw(new InvalidOperationException(msg), msg, LOGGER);
+ }
+ return envVariable;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/Utilities/packages.config
----------------------------------------------------------------------
diff --git a/lang/cs/Source/Utilities/packages.config b/lang/cs/Source/Utilities/packages.config
new file mode 100644
index 0000000..c60eef8
--- /dev/null
+++ b/lang/cs/Source/Utilities/packages.config
@@ -0,0 +1,23 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<packages>
+ <package id="Microsoft.Hadoop.Avro" version="1.4.0.0" targetFramework="net45" />
+ <package id="Newtonsoft.Json" version="6.0.8" targetFramework="net45" />
+</packages>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/AbstractEStage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/AbstractEStage.cs b/lang/cs/Source/WAKE/Wake/AbstractEStage.cs
new file mode 100644
index 0000000..c2b1188
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/AbstractEStage.cs
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.Reef.Wake
+{
+ /// <summary>
+ /// An estage that implements metering
+ /// </summary>
+ /// <typeparam name="T">The estage type</typeparam>
+ public abstract class AbstractEStage<T> : IEStage<T>
+ {
+ /// <summary>Constructs an abstract estage</summary>
+ /// <param name="meterName">the meter name</param>
+ protected AbstractEStage(string meterName)
+ {
+ }
+
+ /// <summary>Updates the meter</summary>
+ /// <param name="value">an event</param>
+ public virtual void OnNext(T value)
+ {
+ }
+
+ public abstract void Dispose();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/IEStage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/IEStage.cs b/lang/cs/Source/WAKE/Wake/IEStage.cs
new file mode 100644
index 0000000..cc88d26
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/IEStage.cs
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.Reef.Wake
+{
+ /// <summary>Stage that executes an event handler</summary>
+ public interface IEStage<T> : IEventHandler<T>, IStage
+ {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/IEventHandler.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/IEventHandler.cs b/lang/cs/Source/WAKE/Wake/IEventHandler.cs
new file mode 100644
index 0000000..db6f0c1
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/IEventHandler.cs
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.Reef.Wake
+{
+ /// <summary>
+ /// Handler to process an event
+ /// </summary>
+ /// <typeparam name="T">The type of event</typeparam>
+ public interface IEventHandler<T>
+ {
+ /// <summary>
+ /// Process an event
+ /// </summary>
+ /// <param name="value">The event to process</param>
+ void OnNext(T value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/IIdentifier.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/IIdentifier.cs b/lang/cs/Source/WAKE/Wake/IIdentifier.cs
new file mode 100644
index 0000000..38e63aa
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/IIdentifier.cs
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Org.Apache.Reef.Wake
+{
+ /// <summary>
+ /// Identifier class for REEF.
+ ///
+ /// Identifiers are a generic naming primitive that carry some information about
+ /// the type of object that they point to.
+ ///
+ /// Examples include remote sockets or filenames.
+ /// </summary>
+ public abstract class IIdentifier
+ {
+ /// <summary>
+ /// Returns a hash code for the object
+ /// </summary>
+ /// <returns>The hash code value for this object</returns>
+ public abstract override int GetHashCode();
+
+ /// <summary>
+ /// Checks that another object is equal to this object
+ /// </summary>
+ /// <param name="o">The object to compare</param>
+ /// <returns>True if the object is the same as the object argument; false, otherwise</returns>
+ public abstract override bool Equals(object o);
+
+ /// <summary>
+ /// Returns a string representation of this object
+ /// </summary>
+ /// <returns>A string representation of this object</returns>
+ public abstract override string ToString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/IIdentifierFactory.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/IIdentifierFactory.cs b/lang/cs/Source/WAKE/Wake/IIdentifierFactory.cs
new file mode 100644
index 0000000..a06928a
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/IIdentifierFactory.cs
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Tang.Annotations;
+using Org.Apache.Reef.Wake.Remote.Impl;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Org.Apache.Reef.Wake
+{
+ [DefaultImplementation(typeof(StringIdentifierFactory))]
+ public interface IIdentifierFactory
+ {
+ IIdentifier Create(string s);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/IObserverFactory.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/IObserverFactory.cs b/lang/cs/Source/WAKE/Wake/IObserverFactory.cs
new file mode 100644
index 0000000..32242d1
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/IObserverFactory.cs
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Org.Apache.Reef.Wake
+{
+ public interface IObserverFactory
+ {
+ object Create();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/IStage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/IStage.cs b/lang/cs/Source/WAKE/Wake/IStage.cs
new file mode 100644
index 0000000..afc72a5
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/IStage.cs
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+
+namespace Org.Apache.Reef.Wake
+{
+ /// <summary>Stage is an execution unit for events and provides a way to reclaim its resources
+ /// </summary>
+ public interface IStage : IDisposable
+ {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Impl/LoggingEventHandler.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Impl/LoggingEventHandler.cs b/lang/cs/Source/WAKE/Wake/Impl/LoggingEventHandler.cs
new file mode 100644
index 0000000..35549ae
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Impl/LoggingEventHandler.cs
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Utilities.Logging;
+using Org.Apache.Reef.Tang.Annotations;
+using System;
+
+namespace Org.Apache.Reef.Wake.Impl
+{
+ /// <summary>A logging event handler</summary>
+ public class LoggingEventHandler<T> : IObserver<T>
+ {
+ private static readonly Logger LOGGER = Logger.GetLogger(typeof(T));
+
+ [Inject]
+ public LoggingEventHandler()
+ {
+ }
+
+ /// <summary>Logs the event</summary>
+ /// <param name="value">an event</param>
+ public void OnNext(T value)
+ {
+ LOGGER.Log(Level.Verbose, "Event: " + DateTime.Now + value);
+ }
+
+ public void OnError(Exception error)
+ {
+ throw new NotImplementedException();
+ }
+
+ public void OnCompleted()
+ {
+ throw new NotImplementedException();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Impl/MissingStartHandlerHandler.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Impl/MissingStartHandlerHandler.cs b/lang/cs/Source/WAKE/Wake/Impl/MissingStartHandlerHandler.cs
new file mode 100644
index 0000000..60e7fa0
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Impl/MissingStartHandlerHandler.cs
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using Org.Apache.Reef.Tang.Annotations;
+using Org.Apache.Reef.Wake.Time;
+
+namespace Org.Apache.Reef.Wake.Impl
+{
+ public class MissingStartHandlerHandler : IObserver<StartTime>
+ {
+ [Inject]
+ public MissingStartHandlerHandler()
+ {
+ }
+
+ public void OnNext(StartTime value)
+ {
+ // Do nothing, since we only use this for evaluator, not for driver.
+ // LOGGER.Log(Level.Info, "No binding to Clock.StartHandler. It is likely that the clock will immediately go idle and close.");
+ }
+
+ public void OnError(Exception error)
+ {
+ throw new NotImplementedException();
+ }
+
+ public void OnCompleted()
+ {
+ throw new NotImplementedException();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Impl/MultiEventHandler.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Impl/MultiEventHandler.cs b/lang/cs/Source/WAKE/Wake/Impl/MultiEventHandler.cs
new file mode 100644
index 0000000..855e459
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Impl/MultiEventHandler.cs
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using Org.Apache.Reef.Utilities.Logging;
+
+namespace Org.Apache.Reef.Wake.Impl
+{
+ /// <summary>Event handler that dispatches an event to a specific handler based on an event class type
+ /// </summary>
+ public class MultiEventHandler<T> : IEventHandler<T>
+ {
+ private static readonly Logger LOGGER = Logger.GetLogger(typeof(MultiEventHandler<T>));
+ private readonly IDictionary<Type, IEventHandler<T>> _map;
+
+ /// <summary>Constructs a multi-event handler</summary>
+ /// <param name="map">a map of class types to event handlers</param>
+ public MultiEventHandler(IDictionary<Type, IEventHandler<T>> map)
+ {
+ foreach (Type item in map.Keys)
+ {
+ if (!typeof(T).IsAssignableFrom(item))
+ {
+ Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(new WakeRuntimeException(typeof(T) + " is not assignable from " + item), LOGGER);
+ }
+ }
+ _map = map;
+ }
+
+ /// <summary>
+ /// Invokes a specific handler for the event class type if it exists
+ /// </summary>
+ /// <param name="value">The event to handle</param>
+ public void OnNext(T value)
+ {
+ IEventHandler<T> handler = null;
+ bool success = _map.TryGetValue(value.GetType(), out handler);
+ if (success)
+ {
+ handler.OnNext(value);
+ }
+ else
+ {
+ Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(new WakeRuntimeException("No event " + value.GetType() + " handler"), LOGGER);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Impl/PeriodicEvent.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Impl/PeriodicEvent.cs b/lang/cs/Source/WAKE/Wake/Impl/PeriodicEvent.cs
new file mode 100644
index 0000000..417f008
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Impl/PeriodicEvent.cs
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.Reef.Wake.Impl
+{
+ /// <summary>Periodic event for timers</summary>
+ public class PeriodicEvent
+ {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Impl/PubSubEventHandler.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Impl/PubSubEventHandler.cs b/lang/cs/Source/WAKE/Wake/Impl/PubSubEventHandler.cs
new file mode 100644
index 0000000..ba4a5a0
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Impl/PubSubEventHandler.cs
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Reflection;
+using Org.Apache.Reef.Utilities.Logging;
+
+namespace Org.Apache.Reef.Wake.Impl
+{
+ /// <summary>
+ /// Event handler to provide publish/subscribe interfaces
+ /// </summary>
+ /// <typeparam name="T">The type of event handler</typeparam>
+ public class PubSubEventHandler<T> : IEventHandler<T>
+ {
+ private static readonly Logger LOGGER = Logger.GetLogger(typeof(PubSubEventHandler<T>));
+
+ private Dictionary<Type, List<object>> _classToHandlersMap;
+
+ /// <summary>
+ /// Construct a pub-sub event handler
+ /// </summary>
+ public PubSubEventHandler()
+ {
+ _classToHandlersMap = new Dictionary<Type, List<object>>();
+ }
+
+ /// <summary>
+ /// Subscribe an event handler for an event type
+ /// </summary>
+ /// <typeparam name="U">The type of event handler</typeparam>
+ /// <param name="handler">The event handler</param>
+ public void Subscribe<U>(IEventHandler<U> handler) where U : T
+ {
+ lock (_classToHandlersMap)
+ {
+ List<object> handlers;
+ if (!_classToHandlersMap.TryGetValue(typeof(U), out handlers))
+ {
+ handlers = new List<object>();
+ _classToHandlersMap[typeof(U)] = handlers;
+ }
+ handlers.Add(handler);
+ }
+ }
+
+ /// <summary>
+ /// Invoke the subscribed handlers for the event class type
+ /// </summary>
+ /// <param name="value">The event to process</param>
+ public void OnNext(T value)
+ {
+ if (value == null)
+ {
+ Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(new ArgumentNullException("value"), LOGGER);
+ }
+
+ lock (_classToHandlersMap)
+ {
+ // Check that the event type has been subscribed
+ List<object> handlers;
+ if (!_classToHandlersMap.TryGetValue(value.GetType(), out handlers))
+ {
+ Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(new ArgumentException("No event for type " + value.GetType()), LOGGER);
+ }
+
+ // Invoke each handler for the event type
+ foreach (object handler in handlers)
+ {
+ Type handlerType = typeof(IEventHandler<>).MakeGenericType(new[] { value.GetType() });
+ MethodInfo info = handlerType.GetMethod("OnNext");
+ info.Invoke(handler, new[] { (object)value });
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Impl/SingleThreadStage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Impl/SingleThreadStage.cs b/lang/cs/Source/WAKE/Wake/Impl/SingleThreadStage.cs
new file mode 100644
index 0000000..163d347
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Impl/SingleThreadStage.cs
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Concurrent;
+using System.Threading;
+
+namespace Org.Apache.Reef.Wake.Impl
+{
+ /// <summary>Single thread stage that runs the event handler</summary>
+ public class SingleThreadStage<T> : AbstractEStage<T>
+ {
+ private readonly BlockingCollection<T> queue;
+
+ private readonly Thread thread;
+
+ private bool interrupted;
+
+ public SingleThreadStage(IEventHandler<T> handler, int capacity) : base(handler.GetType().FullName)
+ {
+ queue = new BlockingCollection<T>(capacity);
+ interrupted = false;
+ thread = new Thread(new ThreadStart(new Producer<T>(queue, handler, interrupted).Run));
+ thread.Start();
+ }
+
+ /// <summary>
+ /// Puts the value to the queue, which will be processed by the handler later
+ /// if the queue is full, IllegalStateException is thrown
+ /// </summary>
+ /// <param name="value">the value</param>
+ public override void OnNext(T value)
+ {
+ base.OnNext(value);
+ queue.Add(value);
+ }
+
+ /// <summary>
+ /// Closes the stage
+ /// </summary>
+ public override void Dispose()
+ {
+ interrupted = true;
+ thread.Interrupt();
+ }
+ }
+
+ /// <summary>Takes events from the queue and provides them to the handler</summary>
+ /// <typeparam name="T">The type</typeparam>
+ internal class Producer<T>
+ {
+ private readonly BlockingCollection<T> _queue;
+
+ private readonly IEventHandler<T> _handler;
+
+ private volatile bool _interrupted;
+
+ internal Producer(BlockingCollection<T> queue, IEventHandler<T> handler, bool interrupted)
+ {
+ _queue = queue;
+ _handler = handler;
+ _interrupted = interrupted;
+ }
+
+ public void Run()
+ {
+ while (true)
+ {
+ try
+ {
+ T value = _queue.Take();
+ _handler.OnNext(value);
+ }
+ catch (Exception)
+ {
+ if (_interrupted)
+ {
+ break;
+ }
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Impl/SyncStage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Impl/SyncStage.cs b/lang/cs/Source/WAKE/Wake/Impl/SyncStage.cs
new file mode 100644
index 0000000..bfa3fe0
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Impl/SyncStage.cs
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Wake;
+
+namespace Org.Apache.Reef.Wake.Impl
+{
+ /// <summary>Stage that synchronously executes an event handler</summary>
+ public class SyncStage<T> : AbstractEStage<T>
+ {
+ private readonly IEventHandler<T> _handler;
+
+ /// <summary>Constructs a synchronous stage</summary>
+ /// <param name="handler">an event handler</param>
+ public SyncStage(IEventHandler<T> handler) : base(handler.GetType().FullName)
+ {
+ _handler = handler;
+ }
+
+ /// <summary>Invokes the handler for the event</summary>
+ /// <param name="value">an event</param>
+ public override void OnNext(T value)
+ {
+ base.OnNext(value);
+ _handler.OnNext(value);
+ }
+
+ public override void Dispose()
+ {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Impl/ThreadPoolStage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Impl/ThreadPoolStage.cs b/lang/cs/Source/WAKE/Wake/Impl/ThreadPoolStage.cs
new file mode 100644
index 0000000..6054f86
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Impl/ThreadPoolStage.cs
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Threading;
+using Org.Apache.Reef.Utilities.Logging;
+using Org.Apache.Reef.Wake;
+using Org.Apache.Reef.Wake.Util;
+
+namespace Org.Apache.Reef.Wake.Impl
+{
+ /// <summary>Stage that executes an event handler with a thread pool</summary>
+ public class ThreadPoolStage<T> : AbstractEStage<T>
+ {
+ private static readonly Logger LOGGER = Logger.GetLogger(typeof(ThreadPoolStage<T>));
+
+ private readonly IEventHandler<T> _handler;
+
+ private readonly ITaskService _taskService;
+
+ private readonly int _numThreads;
+
+ /// <summary>Constructs a thread-pool stage</summary>
+ /// <param name="handler">An event handler to execute</param>
+ /// <param name="numThreads">The number of threads to use</param>
+ public ThreadPoolStage(IEventHandler<T> handler, int numThreads)
+ : base(handler.GetType().FullName)
+ {
+ _handler = handler;
+ if (numThreads <= 0)
+ {
+ Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(new WakeRuntimeException("numThreads " + numThreads + " is less than or equal to 0"), LOGGER);
+ }
+ _numThreads = numThreads;
+ _taskService = new FixedThreadPoolTaskService(numThreads);
+ }
+
+ /// <summary>Constructs a thread-pool stage</summary>
+ /// <param name="handler">an event handler to execute</param>
+ /// <param name="taskService">an external executor service provided</param>
+ public ThreadPoolStage(IEventHandler<T> handler, ITaskService taskService) : base(
+ handler.GetType().FullName)
+ {
+ _handler = handler;
+ _numThreads = 0;
+ _taskService = taskService;
+ }
+
+ /// <summary>Handles the event using a thread in the thread pool</summary>
+ /// <param name="value">an event</param>
+ public override void OnNext(T value)
+ {
+ base.OnNext(value);
+ _taskService.Execute(new _Startable_74(this, value).Start);
+ }
+
+ /// <summary>
+ /// Closes resources
+ /// </summary>
+ public override void Dispose()
+ {
+ if (_numThreads > 0)
+ {
+ _taskService.Shutdown();
+ }
+ }
+
+ private sealed class _Startable_74 : IStartable
+ {
+ private readonly ThreadPoolStage<T> _enclosing;
+ private readonly T _value;
+
+ public _Startable_74(ThreadPoolStage<T> enclosing, T value)
+ {
+ _enclosing = enclosing;
+ _value = value;
+ }
+
+ public void Start()
+ {
+ _enclosing._handler.OnNext(_value);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Impl/TimerStage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Impl/TimerStage.cs b/lang/cs/Source/WAKE/Wake/Impl/TimerStage.cs
new file mode 100644
index 0000000..3b1e612
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Impl/TimerStage.cs
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Timers;
+
+using Org.Apache.Reef.Wake;
+
+namespace Org.Apache.Reef.Wake.Impl
+{
+ /// <summary>Stage that triggers an event handler periodically</summary>
+ public class TimerStage : IStage
+ {
+ //private readonly ScheduledExecutorService executor;
+ private readonly Timer _timer;
+ private readonly PeriodicEvent _value = new PeriodicEvent();
+ private readonly IEventHandler<PeriodicEvent> _handler;
+
+ /// <summary>Constructs a timer stage with no initial delay</summary>
+ /// <param name="handler">an event handler</param>
+ /// <param name="period">a period in milli-seconds</param>
+ public TimerStage(IEventHandler<PeriodicEvent> handler, long period) : this(handler, 0, period)
+ {
+ }
+
+ /// <summary>Constructs a timer stage</summary>
+ /// <param name="handler">an event handler</param>
+ /// <param name="initialDelay">an initial delay</param>
+ /// <param name="period">a period in milli-seconds</param>
+ public TimerStage(IEventHandler<PeriodicEvent> handler, long initialDelay, long period)
+ {
+ _handler = handler;
+ _timer = new Timer(period);
+ _timer.Elapsed += (sender, e) => OnTimedEvent(sender, e, _handler, _value);
+ _timer.Enabled = true;
+ }
+
+ /// <summary>
+ /// Closes resources
+ /// </summary>
+ public void Dispose()
+ {
+ _timer.Stop();
+ }
+
+ private static void OnTimedEvent(object source, ElapsedEventArgs e, IEventHandler<PeriodicEvent> handler, PeriodicEvent value)
+ {
+ handler.OnNext(value);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Properties/AssemblyInfo.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Properties/AssemblyInfo.cs b/lang/cs/Source/WAKE/Wake/Properties/AssemblyInfo.cs
new file mode 100644
index 0000000..c0d8070
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Properties/AssemblyInfo.cs
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Reflection;
+using System.Runtime.CompilerServices;
+using System.Runtime.InteropServices;
+
+// General Information about an assembly is controlled through the following
+// set of attributes. Change these attribute values to modify the information
+// associated with an assembly.
+[assembly: AssemblyTitle("Wake")]
+[assembly: AssemblyDescription("")]
+[assembly: AssemblyConfiguration("")]
+[assembly: AssemblyCompany("")]
+[assembly: AssemblyProduct("Wake")]
+[assembly: AssemblyCopyright("Copyright © 2015")]
+[assembly: AssemblyTrademark("")]
+[assembly: AssemblyCulture("")]
+
+// Setting ComVisible to false makes the types in this assembly not visible
+// to COM components. If you need to access a type in this assembly from
+// COM, set the ComVisible attribute to true on that type.
+[assembly: ComVisible(false)]
+
+// The following GUID is for the ID of the typelib if this project is exposed to COM
+[assembly: Guid("86a66ac8-0c8e-4652-b533-670e800cb0ea")]
+
+// Version information for an assembly consists of the following four values:
+//
+// Major Version
+// Minor Version
+// Build Number
+// Revision
+//
+// You can specify all the values or you can default the Build and Revision Numbers
+// by using the '*' as shown below:
+// [assembly: AssemblyVersion("1.0.*")]
+[assembly: AssemblyVersion("1.0.0.0")]
+[assembly: AssemblyFileVersion("1.0.0.0")]
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Protobuf/RemoteProtocol.proto
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Protobuf/RemoteProtocol.proto b/lang/cs/Source/WAKE/Wake/Protobuf/RemoteProtocol.proto
new file mode 100644
index 0000000..cd28d13
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Protobuf/RemoteProtocol.proto
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+option java_package = "org.apache.reef.wake.remote.proto";
+option java_outer_classname = "WakeRemoteProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+message WakeMessagePBuf {
+ required bytes data = 1;
+ required int64 seq = 2;
+ optional string source = 3;
+ optional string sink = 4;
+}
+
+message WakeTuplePBuf {
+ required string className = 1;
+ required bytes data = 2;
+}
+