You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/01/29 21:42:52 UTC
[08/31] incubator-reef git commit: [REEF-97] Add the REEF.NET code
base
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Time/Runtime/RealTimer.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Time/Runtime/RealTimer.cs b/lang/cs/Source/WAKE/Wake/Time/Runtime/RealTimer.cs
new file mode 100644
index 0000000..2e99202
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Time/Runtime/RealTimer.cs
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Org.Apache.Reef.Tang.Annotations;
+using Org.Apache.Reef.Wake.Time.Runtime.Event;
+
+namespace Org.Apache.Reef.Wake.Time.Runtime
+{
+ public class RealTimer : ITimer
+ {
+ [Inject]
+ public RealTimer()
+ {
+ }
+
+ /// <summary>
+ /// Gets the number of milliseconds since Epoch
+ /// </summary>
+ public long CurrentTime
+ {
+ get { return DateTime.Now.Ticks / TimeSpan.TicksPerMillisecond; }
+ }
+
+ /// <summary>
+ /// Gets the difference between the given time and the current time
+ /// </summary>
+ /// <param name="time">The time to compare against the current time</param>
+ public long GetDuration(long time)
+ {
+ return time - CurrentTime;
+ }
+
+ /// <summary>
+ /// Checks if the given time has already passed.
+ /// </summary>
+ /// <param name="time">The time to check if it has passed or not</param>
+ public bool IsReady(long time)
+ {
+ return GetDuration(time) <= 0;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Time/Runtime/RuntimeClock.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Time/Runtime/RuntimeClock.cs b/lang/cs/Source/WAKE/Wake/Time/Runtime/RuntimeClock.cs
new file mode 100644
index 0000000..a8c77af
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Time/Runtime/RuntimeClock.cs
@@ -0,0 +1,257 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Reactive;
+using System.Reactive.Linq;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using Org.Apache.Reef.Utilities.Diagnostics;
+using Org.Apache.Reef.Utilities.Logging;
+using Org.Apache.Reef.Tang.Annotations;
+using Org.Apache.Reef.Tang.Exceptions;
+using Org.Apache.Reef.Tang.Implementations;
+using Org.Apache.Reef.Wake.RX.Impl;
+
+namespace Org.Apache.Reef.Wake.Time.Runtime.Event
+{
+ public class RuntimeClock : IClock
+ {
+ private static readonly Logger LOGGER = Logger.GetLogger(typeof(RuntimeClock));
+
+ private ITimer _timer;
+ private PubSubSubject<Time> _handlers;
+ private ISet<Time> _schedule;
+
+ private IInjectionFuture<ISet<IObserver<StartTime>>> _startHandler;
+ private IInjectionFuture<ISet<IObserver<StopTime>>> _stopHandler;
+ private IInjectionFuture<ISet<IObserver<RuntimeStart>>> _runtimeStartHandler;
+ private IInjectionFuture<ISet<IObserver<RuntimeStop>>> _runtimeStopHandler;
+ private IInjectionFuture<ISet<IObserver<IdleClock>>> _idleHandler;
+
+ private bool _disposed;
+
+ /// <summary>
+ /// Create a new RuntimeClock with injectable IObservers
+ /// </summary>
+ /// <param name="timer">The runtime clock timer</param>
+ /// <param name="startHandler">The start handler</param>
+ /// <param name="stopHandler">The stop handler</param>
+ /// <param name="runtimeStartHandler">The runtime start handler</param>
+ /// <param name="runtimeStopHandler">The runtime stop handler</param>
+ /// <param name="idleHandler">The idle handler</param>
+ [Inject]
+ internal RuntimeClock(
+ ITimer timer,
+ [Parameter(typeof(StartHandler))] IInjectionFuture<ISet<IObserver<StartTime>>> startHandler,
+ [Parameter(typeof(StopHandler))] IInjectionFuture<ISet<IObserver<StopTime>>> stopHandler,
+ [Parameter(typeof(RuntimeStartHandler))] IInjectionFuture<ISet<IObserver<RuntimeStart>>> runtimeStartHandler,
+ [Parameter(typeof(RuntimeStopHandler))] IInjectionFuture<ISet<IObserver<RuntimeStop>>> runtimeStopHandler,
+ [Parameter(typeof(IdleHandler))] IInjectionFuture<ISet<IObserver<IdleClock>>> idleHandler)
+ {
+ _timer = timer;
+ _schedule = new SortedSet<Time>();
+ _handlers = new PubSubSubject<Time>();
+
+ _startHandler = startHandler;
+ _stopHandler = stopHandler;
+ _runtimeStartHandler = runtimeStartHandler;
+ _runtimeStopHandler = runtimeStopHandler;
+ _idleHandler = idleHandler;
+ }
+
+ public IInjectionFuture<ISet<IObserver<RuntimeStart>>> InjectedRuntimeStartHandler
+ {
+ get { return _runtimeStartHandler; }
+ set { _runtimeStartHandler = value; }
+ }
+
+ public IInjectionFuture<ISet<IObserver<RuntimeStop>>> InjectedRuntimeStopHandler
+ {
+ get { return _runtimeStopHandler; }
+ set { _runtimeStopHandler = value; }
+ }
+
+ /// <summary>
+ /// Schedule a TimerEvent at the given future offset
+ /// </summary>
+ /// <param name="offset">The offset in the future to schedule the alarm</param>
+ /// <param name="handler">The IObserver to to be called</param>
+ public override void ScheduleAlarm(long offset, IObserver<Alarm> handler)
+ {
+ if (_disposed)
+ {
+ return;
+ }
+ if (handler == null)
+ {
+ Exceptions.Throw(new ArgumentNullException("handler"), LOGGER);
+ }
+
+ lock (_schedule)
+ {
+ _schedule.Add(new ClientAlarm(_timer.CurrentTime + offset, handler));
+ Monitor.PulseAll(_schedule);
+ }
+ }
+
+ /// <summary>
+ /// Clock is idle if it has no future alarms set
+ /// </summary>
+ /// <returns>True if no future alarms are set, otherwise false</returns>
+ public override bool IsIdle()
+ {
+ lock (_schedule)
+ {
+ return _schedule.Count == 0;
+ }
+ }
+
+ /// <summary>
+ /// Dispose of the clock and all scheduled alarms
+ /// </summary>
+ public override void Dispose()
+ {
+ lock (_schedule)
+ {
+ _schedule.Clear();
+ _schedule.Add(new StopTime(_timer.CurrentTime));
+ Monitor.PulseAll(_schedule);
+ _disposed = true;
+ }
+ }
+
+ /// <summary>
+ /// Register the IObserver for the particular Time event.
+ /// </summary>
+ /// <param name="observer">The handler to register</param>
+ public void RegisterObserver<U>(IObserver<U> observer) where U : Time
+ {
+ if (_disposed)
+ {
+ return;
+ }
+
+ _handlers.Subscribe(observer);
+ }
+
+ /// <summary>
+ /// Start the RuntimeClock.
+ /// Clock will continue to run and handle events until it has been disposed.
+ /// </summary>
+ public void Run()
+ {
+ SubscribeHandlers();
+ _handlers.OnNext(new RuntimeStart(_timer.CurrentTime));
+ _handlers.OnNext(new StartTime(_timer.CurrentTime));
+
+ while (true)
+ {
+ lock (_schedule)
+ {
+ if (IsIdle())
+ {
+ _handlers.OnNext(new IdleClock(_timer.CurrentTime));
+ }
+
+ // Blocks and releases lock until it receives the next event
+ Time alarm = GetNextEvent();
+ ProcessEvent(alarm);
+
+ if (alarm is StopTime)
+ {
+ break;
+ }
+ }
+ }
+ _handlers.OnNext(new RuntimeStop(_timer.CurrentTime));
+ }
+
+ /// <summary>
+ /// Register the event handlers
+ /// </summary>
+ private void SubscribeHandlers()
+ {
+ Subscribe(_startHandler.Get());
+ Subscribe(_stopHandler.Get());
+ Subscribe(_runtimeStartHandler.Get());
+ Subscribe(_runtimeStopHandler.Get());
+ Subscribe(_idleHandler.Get());
+ }
+
+ /// <summary>
+ /// Subscribe a set of IObservers for a particular Time event
+ /// </summary>
+ /// <param name="observers">The set of observers to subscribe</param>
+ private void Subscribe<U>(ISet<IObserver<U>> observers) where U : Time
+ {
+ foreach (IObserver<U> observer in observers)
+ {
+ _handlers.Subscribe(observer);
+ }
+ }
+
+ /// <summary>
+ /// Wait until the first scheduled alarm is ready to be handled
+ /// Assumes that we have a lock on the _schedule SortedSet
+ /// </summary>
+ private Time GetNextEvent()
+ {
+ // Wait for an alarm to be scheduled on the condition variable Count
+ while (_schedule.Count == 0)
+ {
+ Monitor.Wait(_schedule);
+ }
+
+ // Once the alarm is scheduled, wait for the prescribed amount of time.
+ // If a new alarm is scheduled with a shorter duration, Wait will preempt
+ // and duration will update to reflect the new alarm's timestamp
+ for (long duration = _timer.GetDuration(_schedule.First().TimeStamp);
+ duration > 0;
+ duration = _timer.GetDuration(_schedule.First().TimeStamp))
+ {
+ Monitor.Wait(_schedule, TimeSpan.FromMilliseconds(duration));
+ }
+
+ Time time = _schedule.First();
+ _schedule.Remove(time);
+ return time;
+ }
+
+ /// <summary>
+ /// Process the next Time event.
+ /// </summary>
+ /// <param name="time">The Time event to handle</param>
+ private void ProcessEvent(Time time)
+ {
+ if (time is Alarm)
+ {
+ Alarm alarm = (Alarm) time;
+ alarm.Handle();
+ }
+ else
+ {
+ _handlers.OnNext(time);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Time/Time.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Time/Time.cs b/lang/cs/Source/WAKE/Wake/Time/Time.cs
new file mode 100644
index 0000000..0ebc1a7
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Time/Time.cs
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Globalization;
+
+namespace Org.Apache.Reef.Wake.Time
+{
+ /// <summary>
+ /// Time object
+ /// </summary>
+ public abstract class Time : IComparable<Time>
+ {
+ public Time(long timeStamp)
+ {
+ TimeStamp = timeStamp;
+ }
+
+ public long TimeStamp { get; private set; }
+
+ public override string ToString()
+ {
+ return string.Format(CultureInfo.InvariantCulture, "{0}:[{1}]", GetType().Name, TimeStamp);
+ }
+
+ public override int GetHashCode()
+ {
+ return base.GetHashCode();
+ }
+
+ public override bool Equals(object obj)
+ {
+ if (this == obj)
+ {
+ return true;
+ }
+ Time other = obj as Time;
+ if (other != null)
+ {
+ return CompareTo(other) == 0;
+ }
+ return false;
+ }
+
+ public int CompareTo(Time other)
+ {
+ if (TimeStamp < other.TimeStamp)
+ {
+ return -1;
+ }
+ if (TimeStamp > other.TimeStamp)
+ {
+ return 1;
+ }
+ if (GetHashCode() < other.GetHashCode())
+ {
+ return -1;
+ }
+ if (GetHashCode() > other.GetHashCode())
+ {
+ return 1;
+ }
+ return 0;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Util/Actionable.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Util/Actionable.cs b/lang/cs/Source/WAKE/Wake/Util/Actionable.cs
new file mode 100644
index 0000000..5e5364c
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Util/Actionable.cs
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Threading;
+
+namespace Org.Apache.Reef.Wake.Util
+{
+ public class Actionable
+ {
+ private readonly ThreadStart _threadStart;
+
+ public Actionable()
+ {
+ }
+
+ internal Actionable(ThreadStart threadStart)
+ {
+ _threadStart = threadStart;
+ }
+
+ public void Call()
+ {
+ _threadStart();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Util/Disposable.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Util/Disposable.cs b/lang/cs/Source/WAKE/Wake/Util/Disposable.cs
new file mode 100644
index 0000000..ebe30bc
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Util/Disposable.cs
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Org.Apache.Reef.Wake.Util
+{
+ /// <summary>
+ /// Generates IDisposables from a factory method
+ /// </summary>
+ internal class Disposable : IDisposable
+ {
+ private Action _disposeFunction;
+ private bool _disposed;
+
+ private Disposable(Action disposeFunction)
+ {
+ _disposeFunction = disposeFunction;
+ _disposed = false;
+ }
+
+ /// <summary>
+ /// Factory method to create an IDisposable from a function.
+ /// </summary>
+ /// <param name="disposeFunction">The function to call when disposing</param>
+ /// <returns>An IDisposable from the given dispose function</returns>
+ public static IDisposable Create(Action disposeFunction)
+ {
+ return new Disposable(disposeFunction);
+ }
+
+ /// <summary>
+ /// Dispose of resources by calling the supplied dispose function
+ /// </summary>
+ public void Dispose()
+ {
+ if (!_disposed)
+ {
+ _disposeFunction();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Util/FixedThreadPoolTaskService.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Util/FixedThreadPoolTaskService.cs b/lang/cs/Source/WAKE/Wake/Util/FixedThreadPoolTaskService.cs
new file mode 100644
index 0000000..e86820d
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Util/FixedThreadPoolTaskService.cs
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using Org.Apache.Reef.Utilities.Diagnostics;
+using Org.Apache.Reef.Utilities.Logging;
+using Org.Apache.Reef.Wake.Time.Runtime.Event;
+
+namespace Org.Apache.Reef.Wake.Util
+{
+ public class FixedThreadPoolTaskService : ITaskService
+ {
+ private static readonly Logger LOGGER = Logger.GetLogger(typeof(FixedThreadPoolTaskService));
+
+ TaskFactory factory;
+
+ List<Task> tasks = new List<Task>();
+ bool shuttingDown;
+
+ internal FixedThreadPoolTaskService(int maxDegreeOfParallelism)
+ {
+ LimitedConcurrencyLevelTaskScheduler lcts = new LimitedConcurrencyLevelTaskScheduler(maxDegreeOfParallelism);
+ factory = new TaskFactory(lcts);
+ }
+
+ public bool AwaitTermination(long n, TimeSpan unit)
+ {
+ Task[] allTasks;
+ lock (tasks)
+ {
+ if (tasks.Count == 0)
+ {
+ return true;
+ }
+ allTasks = tasks.ToArray();
+ }
+ return Task.WaitAll(allTasks, unit);
+ }
+
+ public void ShutdownNow()
+ {
+ Shutdown();
+ }
+
+ public void Shutdown()
+ {
+ lock (tasks)
+ {
+ shuttingDown = true;
+ }
+ }
+
+ public Task<T> Submit<T>(Func<T> c)
+ {
+ Task<T> task = null;
+ lock (tasks)
+ {
+ if (shuttingDown)
+ {
+ Exceptions.Throw(new InvalidOperationException("Shutting down"), LOGGER);
+ }
+
+ CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
+ CancellationToken cancellationToken = cancellationTokenSource.Token;
+ task = factory.StartNew(c, cancellationToken);
+ tasks.Add(task);
+ }
+ return task;
+ }
+
+ public void Execute(ThreadStart threadStart)
+ {
+ new Actionable(threadStart).Call();
+ }
+
+ internal void RemoveTask(Task task)
+ {
+ lock (tasks)
+ {
+ tasks.Remove(task);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Util/IStartable.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Util/IStartable.cs b/lang/cs/Source/WAKE/Wake/Util/IStartable.cs
new file mode 100644
index 0000000..2327819
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Util/IStartable.cs
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Org.Apache.Reef.Wake.Util
+{
+ public interface IStartable
+ {
+ void Start();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Util/ITaskService.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Util/ITaskService.cs b/lang/cs/Source/WAKE/Wake/Util/ITaskService.cs
new file mode 100644
index 0000000..2ad0e1e
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Util/ITaskService.cs
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Org.Apache.Reef.Wake.Util
+{
+ public interface ITaskService
+ {
+ void Shutdown();
+
+ void Execute(ThreadStart threadStart);
+
+ Task<T> Submit<T>(Func<T> ob);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Util/LimitedConcurrencyLevelTaskScheduler.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Util/LimitedConcurrencyLevelTaskScheduler.cs b/lang/cs/Source/WAKE/Wake/Util/LimitedConcurrencyLevelTaskScheduler.cs
new file mode 100644
index 0000000..4bd3083
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Util/LimitedConcurrencyLevelTaskScheduler.cs
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Org.Apache.Reef.Utilities.Diagnostics;
+using Org.Apache.Reef.Utilities.Logging;
+
+namespace Org.Apache.Reef.Wake.Util
+{
+ internal class LimitedConcurrencyLevelTaskScheduler : TaskScheduler
+ {
+ private static readonly Logger LOGGER = Logger.GetLogger(typeof(LimitedConcurrencyLevelTaskScheduler));
+
+ /// <summary>Whether the current thread is processing work items.</summary>
+ [ThreadStatic]
+ private static bool _currentThreadIsProcessingItems;
+
+ /// <summary>The list of tasks to be executed.</summary>
+ private readonly LinkedList<Task> _tasks = new LinkedList<Task>(); // protected by lock(_tasks)
+
+ /// <summary>The maximum concurrency level allowed by this scheduler.</summary>
+ private readonly int _maxDegreeOfParallelism;
+
+ /// <summary>Whether the scheduler is currently processing work items.</summary>
+ private int _delegatesQueuedOrRunning = 0; // protected by lock(_tasks)
+
+ /// <summary>
+ /// Initializes an instance of the LimitedConcurrencyLevelTaskScheduler class with the
+ /// specified degree of parallelism.
+ /// </summary>
+ /// <param name="maxDegreeOfParallelism">The maximum degree of parallelism provided by this scheduler.</param>
+ public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism)
+ {
+ if (maxDegreeOfParallelism < 1)
+ {
+ Exceptions.Throw(new ArgumentOutOfRangeException("maxDegreeOfParallelism"), LOGGER);
+ }
+ _maxDegreeOfParallelism = maxDegreeOfParallelism;
+ }
+
+ /// <summary>Gets the maximum concurrency level supported by this scheduler.</summary>
+ public sealed override int MaximumConcurrencyLevel
+ {
+ get
+ {
+ return _maxDegreeOfParallelism;
+ }
+ }
+
+ /// <summary>Queues a task to the scheduler.</summary>
+ /// <param name="task">The task to be queued.</param>
+ protected sealed override void QueueTask(Task task)
+ {
+ // Add the task to the list of tasks to be processed. If there aren't enough
+ // delegates currently queued or running to process tasks, schedule another.
+ lock (_tasks)
+ {
+ _tasks.AddLast(task);
+ if (_delegatesQueuedOrRunning < _maxDegreeOfParallelism)
+ {
+ ++_delegatesQueuedOrRunning;
+ NotifyThreadPoolOfPendingWork();
+ }
+ }
+ }
+
+ /// <summary>Attempts to execute the specified task on the current thread.</summary>
+ /// <param name="task">The task to be executed.</param>
+ /// <param name="taskWasPreviouslyQueued"></param>
+ /// <returns>Whether the task could be executed on the current thread.</returns>
+ protected sealed override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
+ {
+ // If this thread isn't already processing a task, we don't support inlining
+ if (!_currentThreadIsProcessingItems)
+ {
+ return false;
+ }
+
+ // If the task was previously queued, remove it from the queue
+ if (taskWasPreviouslyQueued)
+ {
+ TryDequeue(task);
+ }
+
+ // Try to run the task.
+ return TryExecuteTask(task);
+ }
+
+ /// <summary>Attempts to remove a previously scheduled task from the scheduler.</summary>
+ /// <param name="task">The task to be removed.</param>
+ /// <returns>Whether the task could be found and removed.</returns>
+ protected sealed override bool TryDequeue(Task task)
+ {
+ lock (_tasks)
+ {
+ return _tasks.Remove(task);
+ }
+ }
+
+ /// <summary>Gets an enumerable of the tasks currently scheduled on this scheduler.</summary>
+ /// <returns>An enumerable of the tasks currently scheduled.</returns>
+ protected sealed override IEnumerable<Task> GetScheduledTasks()
+ {
+ bool lockTaken = false;
+ try
+ {
+ Monitor.TryEnter(_tasks, ref lockTaken);
+ if (lockTaken)
+ {
+ return _tasks.ToArray();
+ }
+ else
+ {
+ throw new NotSupportedException();
+ }
+ }
+ finally
+ {
+ if (lockTaken)
+ {
+ Monitor.Exit(_tasks);
+ }
+ }
+ }
+
+ /// <summary>
+ /// Informs the ThreadPool that there's work to be executed for this scheduler.
+ /// </summary>
+ private void NotifyThreadPoolOfPendingWork()
+ {
+ ThreadPool.UnsafeQueueUserWorkItem(_ =>
+ {
+ // Note that the current thread is now processing work items.
+ // This is necessary to enable inlining of tasks into this thread.
+ _currentThreadIsProcessingItems = true;
+ try
+ {
+ // Process all available items in the queue.
+ while (true)
+ {
+ Task item;
+ lock (_tasks)
+ {
+ // When there are no more items to be processed,
+ // note that we're done processing, and get out.
+ if (_tasks.Count == 0)
+ {
+ --_delegatesQueuedOrRunning;
+ break;
+ }
+ // Get the next item from the queue
+ item = _tasks.First.Value;
+ _tasks.RemoveFirst();
+ }
+ // Execute the task we pulled out of the queue
+ base.TryExecuteTask(item);
+ }
+ }
+ // We're done processing items on the current thread
+ finally
+ {
+ _currentThreadIsProcessingItems = false;
+ }
+ }, null);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Util/NetworkUtils.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Util/NetworkUtils.cs b/lang/cs/Source/WAKE/Wake/Util/NetworkUtils.cs
new file mode 100644
index 0000000..c598c49
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Util/NetworkUtils.cs
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Net;
+using System.Net.Sockets;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Org.Apache.Reef.Wake.Util
+{
+ public class NetworkUtils
+ {
+ private static IPAddress _localAddress;
+ private static Random _random = new Random();
+
+ /// <summary>
+ /// Returns the first usable IP Address for the machine.
+ /// </summary>
+ /// <returns>The machine's local IP Address</returns>
+ public static IPAddress LocalIPAddress
+ {
+ get
+ {
+ if (_localAddress == null)
+ {
+ IPAddress[] localIps = Dns.GetHostAddresses(Dns.GetHostName());
+ _localAddress = localIps.Where(i => i.AddressFamily.Equals(AddressFamily.InterNetwork))
+ .OrderBy(ip => ip.ToString())
+ .First();
+ }
+
+ return _localAddress;
+ }
+ }
+
+ /// <summary>
+ /// Generate a random port between low (inclusive) and high (exclusive)
+ /// </summary>
+ /// <param name="low">The inclusive lower bound of the of the port range</param>
+ /// <param name="high">The exclusive upper bound of the port range</param>
+ /// <returns>The randomly generated port</returns>
+ public static int GenerateRandomPort(int low, int high)
+ {
+ return _random.Next(low, high);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Util/SerializationHelper.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Util/SerializationHelper.cs b/lang/cs/Source/WAKE/Wake/Util/SerializationHelper.cs
new file mode 100644
index 0000000..cd9e220
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Util/SerializationHelper.cs
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using ProtoBuf;
+using System;
+using System.IO;
+using System.Runtime.Serialization;
+
+namespace Org.Apache.Reef.Wake.Util
+{
+ public class SerializationHelper
+ {
+ public static byte[] Serialize<T>(T t)
+ {
+ using (var s = new MemoryStream())
+ {
+ Serializer.Serialize(s, t);
+ return s.ToArray();
+ }
+ }
+
+ public static T Deserialize<T>(byte[] bytes)
+ {
+ using (var s = new MemoryStream(bytes))
+ {
+ return Serializer.Deserialize<T>(s);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Util/TaskExtensions.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Util/TaskExtensions.cs b/lang/cs/Source/WAKE/Wake/Util/TaskExtensions.cs
new file mode 100644
index 0000000..0c1da40
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Util/TaskExtensions.cs
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Org.Apache.Reef.Wake.Util
+{
+ public static class TaskExtensions
+ {
+ public static void Forget(this Task task)
+ {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Util/TimeHelper.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Util/TimeHelper.cs b/lang/cs/Source/WAKE/Wake/Util/TimeHelper.cs
new file mode 100644
index 0000000..2b437fa
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Util/TimeHelper.cs
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Threading;
+
+namespace Org.Apache.Reef.Wake.Util
+{
+ public class TimeHelper
+ {
+ public const long TicksPerMilliSecond = 10000;
+ public const long TicksPerMicroSecond = 10;
+ public const double TicksPerNanoSecond = .01;
+
+ public static long CurrentTimeToNanoSeconds
+ {
+ get
+ {
+ return DateTime.Now.Ticks / 100;
+ }
+ }
+
+ public static long AsLongNanoSeconds(TimeSpan timeSpan)
+ {
+ return (long)(timeSpan.Ticks * TicksPerNanoSecond);
+ }
+
+ public static double AsDoubleNanoSeconds(TimeSpan timeSpan)
+ {
+ return timeSpan.Ticks * TicksPerNanoSecond;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Wake.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Wake.csproj b/lang/cs/Source/WAKE/Wake/Wake.csproj
new file mode 100644
index 0000000..689df3c
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Wake.csproj
@@ -0,0 +1,214 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<Project ToolsVersion="12.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <Import Project="$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props" Condition="Exists('$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props')" />
+ <PropertyGroup>
+ <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
+ <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
+ <ProjectGuid>{CDFB3464-4041-42B1-9271-83AF24CD5008}</ProjectGuid>
+ <OutputType>Library</OutputType>
+ <AppDesignerFolder>Properties</AppDesignerFolder>
+ <RootNamespace>Org.Apache.Reef.Wake</RootNamespace>
+ <AssemblyName>Org.Apache.Reef.Wake</AssemblyName>
+ <TargetFrameworkVersion>v4.5</TargetFrameworkVersion>
+ <FileAlignment>512</FileAlignment>
+ <SolutionDir Condition="$(SolutionDir) == '' Or $(SolutionDir) == '*Undefined*'">..\..\</SolutionDir>
+ <RestorePackages>true</RestorePackages>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
+ <PlatformTarget>AnyCPU</PlatformTarget>
+ <DebugSymbols>true</DebugSymbols>
+ <DebugType>full</DebugType>
+ <Optimize>false</Optimize>
+ <OutputPath>..\..\..\bin\Debug\Org.Apache.Reef.Wake\</OutputPath>
+ <DefineConstants>DEBUG;TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
+ <PlatformTarget>AnyCPU</PlatformTarget>
+ <DebugType>pdbonly</DebugType>
+ <Optimize>true</Optimize>
+ <OutputPath>..\..\..\bin\Release\Microsoft.Wake\</OutputPath>
+ <DefineConstants>TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <ItemGroup>
+ <Reference Include="protobuf-net">
+ <HintPath>..\..\..\packages\protobuf-net.2.0.0.668\lib\net40\protobuf-net.dll</HintPath>
+ </Reference>
+ <Reference Include="System" />
+ <Reference Include="System.Core" />
+ <Reference Include="System.Reactive.Core">
+ <HintPath>..\..\..\packages\Rx-Core.2.2.5\lib\net45\System.Reactive.Core.dll</HintPath>
+ </Reference>
+ <Reference Include="System.Reactive.Interfaces">
+ <HintPath>..\..\..\packages\Rx-Interfaces.2.2.5\lib\net45\System.Reactive.Interfaces.dll</HintPath>
+ </Reference>
+ <Reference Include="System.Runtime.Serialization" />
+ <Reference Include="System.Xml.Linq" />
+ <Reference Include="System.Data.DataSetExtensions" />
+ <Reference Include="Microsoft.CSharp" />
+ <Reference Include="System.Data" />
+ <Reference Include="System.Xml" />
+ </ItemGroup>
+ <ItemGroup>
+ <Compile Include="AbstractEStage.cs" />
+ <Compile Include="IEStage.cs" />
+ <Compile Include="IEventHandler.cs" />
+ <Compile Include="IIdentifier.cs" />
+ <Compile Include="IIdentifierFactory.cs" />
+ <Compile Include="Impl\LoggingEventHandler.cs" />
+ <Compile Include="Impl\MissingStartHandlerHandler.cs" />
+ <Compile Include="Impl\MultiEventHandler.cs" />
+ <Compile Include="Impl\PeriodicEvent.cs" />
+ <Compile Include="Impl\PubSubEventHandler.cs" />
+ <Compile Include="Impl\SingleThreadStage.cs" />
+ <Compile Include="Impl\SyncStage.cs" />
+ <Compile Include="Impl\ThreadPoolStage.cs" />
+ <Compile Include="Impl\TimerStage.cs" />
+ <Compile Include="IObserverFactory.cs" />
+ <Compile Include="IStage.cs" />
+ <Compile Include="Properties\AssemblyInfo.cs" />
+ <Compile Include="Protobuf\WakeRemoteProtosGen.cs" />
+ <Compile Include="Remote\ICodec.cs" />
+ <Compile Include="Remote\ICodecFactory.cs" />
+ <Compile Include="Remote\IDecoder.cs" />
+ <Compile Include="Remote\IEncoder.cs" />
+ <Compile Include="Remote\ILink.cs" />
+ <Compile Include="Remote\Impl\ByteCodec.cs" />
+ <Compile Include="Remote\Impl\ByteCodecFactory.cs" />
+ <Compile Include="Remote\Impl\Channel.cs" />
+ <Compile Include="Remote\Impl\DefaultRemoteManager.cs" />
+ <Compile Include="Remote\Impl\DefaultRemoteMessage.cs" />
+ <Compile Include="Remote\Impl\IntCodec.cs" />
+ <Compile Include="Remote\Impl\IPEndpointComparer.cs" />
+ <Compile Include="Remote\Impl\Link.cs" />
+ <Compile Include="Remote\Impl\MultiCodec.cs" />
+ <Compile Include="Remote\Impl\MultiDecoder.cs" />
+ <Compile Include="Remote\Impl\MultiEncoder.cs" />
+ <Compile Include="Remote\Impl\ObserverContainer.cs" />
+ <Compile Include="Remote\Impl\RemoteEvent.cs" />
+ <Compile Include="Remote\Impl\RemoteEventCodec.cs" />
+ <Compile Include="Remote\Impl\RemoteEventDecoder.cs" />
+ <Compile Include="Remote\Impl\RemoteEventEncoder.cs" />
+ <Compile Include="Remote\Impl\RemoteEventEndpoint.cs" />
+ <Compile Include="Remote\Impl\SocketRemoteIdentifier.cs" />
+ <Compile Include="Remote\Impl\StringCodec.cs" />
+ <Compile Include="Remote\Impl\StringIdentifier.cs" />
+ <Compile Include="Remote\Impl\StringIdentifierFactory.cs" />
+ <Compile Include="Remote\Impl\TransportClient.cs" />
+ <Compile Include="Remote\Impl\TransportEvent.cs" />
+ <Compile Include="Remote\Impl\TransportServer.cs" />
+ <Compile Include="Remote\IRemoteEvent.cs" />
+ <Compile Include="Remote\IRemoteIdentifier.cs" />
+ <Compile Include="Remote\IRemoteIdentifierFactory.cs" />
+ <Compile Include="Remote\IRemoteManager.cs" />
+ <Compile Include="Remote\IRemoteMessage.cs" />
+ <Compile Include="Remote\ISubscriptionManager.cs" />
+ <Compile Include="Remote\Proto\WakeRemoteProtos.cs" />
+ <Compile Include="Remote\RemoteConfiguration.cs" />
+ <Compile Include="Remote\RemoteRuntimeException.cs" />
+ <Compile Include="RX\AbstractObserver.cs" />
+ <Compile Include="RX\AbstractRxStage.cs" />
+ <Compile Include="RX\Impl\PubSubSubject.cs" />
+ <Compile Include="RX\Impl\RxSyncStage.cs" />
+ <Compile Include="RX\Impl\RxThreadPoolStage.cs" />
+ <Compile Include="RX\Impl\RxTimerStage.cs" />
+ <Compile Include="RX\Impl\SimpleSubject.cs" />
+ <Compile Include="RX\IRxStage.cs" />
+ <Compile Include="RX\IStaticObservable.cs" />
+ <Compile Include="RX\ISubject.cs" />
+ <Compile Include="RX\ObserverCompletedException.cs" />
+ <Compile Include="src\main\cs\Examples\P2p\IEventSource.cs" />
+ <Compile Include="src\main\cs\Examples\P2p\Pull2Push.cs" />
+ <Compile Include="src\main\cs\PeriodicEvent.cs" />
+ <Compile Include="Time\Event\Alarm.cs" />
+ <Compile Include="Time\Event\StartTime.cs" />
+ <Compile Include="Time\Event\StopTime.cs" />
+ <Compile Include="Time\IClock.cs" />
+ <Compile Include="Time\Runtime\Event\ClientAlarm.cs" />
+ <Compile Include="Time\Runtime\Event\IdleClock.cs" />
+ <Compile Include="Time\Runtime\Event\RuntimeAlarm.cs" />
+ <Compile Include="Time\Runtime\Event\RuntimeStart.cs" />
+ <Compile Include="Time\Runtime\Event\RuntimeStop.cs" />
+ <Compile Include="Time\Runtime\ITimer.cs" />
+ <Compile Include="Time\Runtime\LogicalTimer.cs" />
+ <Compile Include="Time\Runtime\RealTimer.cs" />
+ <Compile Include="Time\Runtime\RuntimeClock.cs" />
+ <Compile Include="Time\Time.cs" />
+ <Compile Include="Util\Actionable.cs" />
+ <Compile Include="Util\Disposable.cs" />
+ <Compile Include="Util\FixedThreadPoolTaskService.cs" />
+ <Compile Include="Util\IStartable.cs" />
+ <Compile Include="Util\ITaskService.cs" />
+ <Compile Include="Util\LimitedConcurrencyLevelTaskScheduler.cs" />
+ <Compile Include="Util\NetworkUtils.cs" />
+ <Compile Include="Util\SerializationHelper.cs" />
+ <Compile Include="Util\TaskExtensions.cs" />
+ <Compile Include="Util\TimeHelper.cs" />
+ <Compile Include="WakeRuntimeException.cs" />
+ </ItemGroup>
+ <ItemGroup>
+ <None Include="packages.config" />
+ <None Include="Protobuf\RemoteProtocol.proto" />
+ <None Include="testkey.snk" />
+ </ItemGroup>
+ <ItemGroup>
+ <Folder Include="Impl\Impl\" />
+ <Folder Include="RX\RX\Impl\" />
+ <Folder Include="Time\Time\Event\" />
+ <Folder Include="Time\Time\Runtime\Event\" />
+ <Folder Include="Util\Util\" />
+ </ItemGroup>
+ <ItemGroup>
+ <ProjectReference Include="..\..\Tang\Tang\Tang.csproj">
+ <Project>{97dbb573-3994-417a-9f69-ffa25f00d2a6}</Project>
+ <Name>Tang</Name>
+ </ProjectReference>
+ <ProjectReference Include="..\..\Utilities\Utilities.csproj">
+ <Project>{79e7f89a-1dfb-45e1-8d43-d71a954aeb98}</Project>
+ <Name>Utilities</Name>
+ </ProjectReference>
+ </ItemGroup>
+ <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
+ <Import Project="$(SolutionDir)\.nuget\NuGet.targets" Condition="Exists('$(SolutionDir)\.nuget\NuGet.targets')" />
+ <PropertyGroup>
+ <MainProtoBufDir>.\target\generated-sources\proto\main\cs\Wake</MainProtoBufDir>
+ <TestProtoBufDir>.\target\generated-sources\proto\test\cs\Wake</TestProtoBufDir>
+ <TestRemoteDir>..\..\wake\target\generated-sources\proto\test\cs\Wake</TestRemoteDir>
+ </PropertyGroup>
+ <Target Name="ProtoBuf">
+ <MakeDir Directories="$(MainProtoBufDir)" Condition="!Exists('$(MainProtoBufDir)')" />
+ <MakeDir Directories="$(TestProtoBufDir)" Condition="!Exists('$(TestProtoBufDir)')" />
+ <MakeDir Directories="$(TestRemoteDir)" Condition="!Exists('$(TestRemoteDir)')" />
+ <Exec Command="protogen -i:.\src\main\proto\RemoteProtocol.proto -o:$(MainProtoBufDir)\WakeRemoteProtosGen.cs -ns:Wake.Remote.Proto.WakeRemoteProtos" />
+ <!--<Exec Command="protogen -i:.\src\test\proto\TestProtocol.proto -o:$(TestProtoBufDir)\TestProtosGen.cs -ns:Wake.Test.Proto.TestProtos" />
+ <Exec Command="protogen -i:.\src\test\proto\TestEvent1.proto -o:$(TestRemoteDir)\TestEvent1.pb.cs -ns:Wake.Test.Remote.TestRemote" />-->
+ </Target>
+ <!-- To modify your build process, add your task inside one of the targets below and uncomment it.
+ Other similar extension points exist, see Microsoft.Common.targets.
+ <Target Name="BeforeBuild">
+ </Target>
+ <Target Name="AfterBuild">
+ </Target>
+ -->
+</Project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/WakeRuntimeException.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/WakeRuntimeException.cs b/lang/cs/Source/WAKE/Wake/WakeRuntimeException.cs
new file mode 100644
index 0000000..0ac2c13
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/WakeRuntimeException.cs
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+
+namespace Org.Apache.Reef.Wake
+{
+ /// <summary>Wake runtime exception</summary>
+ [System.Serializable]
+ public class WakeRuntimeException : Exception
+ {
+ private const long serialVersionUID = 1L;
+
+ /// <summary>Constructs a new runtime wake exception with the specified detail message and cause
+ /// </summary>
+ /// <param name="s">the detailed message</param>
+ /// <param name="e">the cause</param>
+ public WakeRuntimeException(string s, Exception e)
+ : base(s, e)
+ {
+ }
+
+ /// <summary>Constructs a new runtime stage exception with the specified detail message
+ /// </summary>
+ /// <param name="s">the detailed message</param>
+ public WakeRuntimeException(string s)
+ : base(s)
+ {
+ }
+
+ /// <summary>Constructs a new runtime stage exception with the specified cause</summary>
+ /// <param name="e">the cause</param>
+ public WakeRuntimeException(Exception e)
+ : base("Runtime Exception", e)
+ {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/packages.config
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/packages.config b/lang/cs/Source/WAKE/Wake/packages.config
new file mode 100644
index 0000000..fd78097
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/packages.config
@@ -0,0 +1,24 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<packages>
+ <package id="protobuf-net" version="2.0.0.668" targetFramework="net45" />
+ <package id="Rx-Core" version="2.2.5" targetFramework="net45" />
+ <package id="Rx-Interfaces" version="2.2.5" targetFramework="net45" />
+</packages>
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/src/main/cs/Examples/P2p/IEventSource.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/src/main/cs/Examples/P2p/IEventSource.cs b/lang/cs/Source/WAKE/Wake/src/main/cs/Examples/P2p/IEventSource.cs
new file mode 100644
index 0000000..d761032
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/src/main/cs/Examples/P2p/IEventSource.cs
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Wake.Examples.P2p
+{
+ /// <summary>
+ /// The pull side of the interface: Clients implement this and register it with
+ /// the PullToPush class.
+ /// </summary>
+ /// <typeparam name="T">The event type</typeparam>
+ public interface IEventSource<T>
+ {
+ /// <summary>
+ /// Gets the next event
+ /// </summary>
+ /// <returns>The next event</returns>
+ T GetNext();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/src/main/cs/Examples/P2p/Pull2Push.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/src/main/cs/Examples/P2p/Pull2Push.cs b/lang/cs/Source/WAKE/Wake/src/main/cs/Examples/P2p/Pull2Push.cs
new file mode 100644
index 0000000..3f596f2
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/src/main/cs/Examples/P2p/Pull2Push.cs
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Utilities.Logging;
+using Org.Apache.Reef.Wake;
+using Org.Apache.Reef.Wake.Util;
+using System;
+using System.Collections.Generic;
+
+namespace Wake.Examples.P2p
+{
+ /// <summary>Performs a Pull-to-Push conversion in Wake.</summary>
+ /// <remarks>
+ /// Performs a Pull-to-Push conversion in Wake.
+ /// The class pulls from a set of event sources, and pushes to a single
+ /// EventHandler. If the downstream event handler blocks, this will block,
+ /// providing a simple rate limiting scheme.
+ /// The EventSources are managed in a basic Queue.
+ /// </remarks>
+ public sealed class Pull2Push<T> : IStartable, IDisposable
+ {
+ private static readonly Logger LOGGER = Logger.GetLogger(typeof(Pull2Push<T>));
+
+ private readonly IEventHandler<T> _output;
+
+ private readonly Queue<IEventSource<T>> _sources = new Queue<IEventSource<T>>();
+
+ private bool _closed = false;
+
+ /// <summary>
+ /// Constructs a new Pull2Push object
+ /// </summary>
+ /// <param name="output">
+ /// the EventHandler that receives the messages from this
+ /// Pull2Push.
+ /// </param>
+ public Pull2Push(IEventHandler<T> output)
+ {
+ // The downstream EventHandler
+ // The upstream event sources
+ _output = output;
+ }
+
+ /// <summary>Registers an event source.</summary>
+ /// <param name="source">
+ /// The source that will be added to the queue of this
+ /// Pull2Push
+ /// </param>
+ public void Register(IEventSource<T> source)
+ {
+ _sources.Enqueue(source);
+ }
+
+ /// <summary>Executes the message loop.</summary>
+ public void Start()
+ {
+ while (!_closed)
+ {
+ // Grab the next available message source, if any
+ IEventSource<T> nextSource = _sources.Dequeue();
+ if (null != nextSource)
+ {
+ // Grab the next message from that source, if any
+ T message = nextSource.GetNext();
+ if (null != message)
+ {
+ // Add the source to the end of the queue again.
+ _sources.Enqueue(nextSource);
+ // Send the message. Note that this may block depending on the underlying EventHandler.
+ _output.OnNext(message);
+ }
+ else
+ {
+ // The message source has returned null as the next message. We drop the message source in that case.
+ LOGGER.Log(Level.Info, "Droping message source {0} from the queue " + nextSource.ToString());
+ }
+ }
+ }
+ }
+
+ // No source where available. We could put a wait() here.
+ public void Dispose()
+ {
+ _closed = true;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/src/main/cs/PeriodicEvent.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/src/main/cs/PeriodicEvent.cs b/lang/cs/Source/WAKE/Wake/src/main/cs/PeriodicEvent.cs
new file mode 100644
index 0000000..a91e298
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/src/main/cs/PeriodicEvent.cs
@@ -0,0 +1,23 @@
+/**
+ * Copyright 2013 Microsoft.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Wake.Impl
+{
+ /// <summary>Periodic event for timers</summary>
+ public class PeriodicEvent
+ {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/testkey.snk
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/testkey.snk b/lang/cs/Source/WAKE/Wake/testkey.snk
new file mode 100644
index 0000000..133423f
Binary files /dev/null and b/lang/cs/Source/WAKE/Wake/testkey.snk differ
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Tests/ReefTests/ConfigFiles/evaluator.conf
----------------------------------------------------------------------
diff --git a/lang/cs/Tests/ReefTests/ConfigFiles/evaluator.conf b/lang/cs/Tests/ReefTests/ConfigFiles/evaluator.conf
new file mode 100644
index 0000000..67256f5
Binary files /dev/null and b/lang/cs/Tests/ReefTests/ConfigFiles/evaluator.conf differ
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Tests/ReefTests/Evaluator.Tests/EvaluatorConfigurationsTests.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Tests/ReefTests/Evaluator.Tests/EvaluatorConfigurationsTests.cs b/lang/cs/Tests/ReefTests/Evaluator.Tests/EvaluatorConfigurationsTests.cs
new file mode 100644
index 0000000..69fc9ae
--- /dev/null
+++ b/lang/cs/Tests/ReefTests/Evaluator.Tests/EvaluatorConfigurationsTests.cs
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Common;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+
+namespace Org.Apache.Reef.Test
+{
+ [TestClass]
+ public class EvaluatorConfigurationsTests
+ {
+ [TestMethod, Priority(0), TestCategory("Unit")]
+ [DeploymentItem(@"ConfigFiles")]
+ public void TestEvaluatorConfigurations()
+ {
+ EvaluatorConfigurations evaluatorConfigurations = new EvaluatorConfigurations("evaluator.conf");
+
+ Assert.IsTrue(evaluatorConfigurations.EvaluatorId.Equals("Node-1-1414443998204"));
+
+ Assert.IsTrue(evaluatorConfigurations.ApplicationId.Equals("REEF_LOCAL_RUNTIME"));
+
+ string rootContextConfigString = evaluatorConfigurations.RootContextConfiguration;
+ Assert.IsFalse(string.IsNullOrWhiteSpace(rootContextConfigString));
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Tests/ReefTests/Evaluator.Tests/EvaluatorTests.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Tests/ReefTests/Evaluator.Tests/EvaluatorTests.cs b/lang/cs/Tests/ReefTests/Evaluator.Tests/EvaluatorTests.cs
new file mode 100644
index 0000000..2c3eaa2
--- /dev/null
+++ b/lang/cs/Tests/ReefTests/Evaluator.Tests/EvaluatorTests.cs
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Common.Avro;
+using Org.Apache.Reef.Common.Evaluator;
+using Org.Apache.Reef.Tasks;
+using Org.Apache.Reef.Tang.Formats;
+using Org.Apache.Reef.Tang.Implementations;
+using Org.Apache.Reef.Tang.Interface;
+using Org.Apache.Reef.Tang.Util;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using System;
+using System.IO;
+
+namespace Org.Apache.Reef.Test
+{
+ [TestClass]
+ public class EvaluatorTests
+ {
+ [TestMethod, Priority(0), TestCategory("Functional")]
+ [Description("Parse Evaluator configuration from Java, inject and execute Shell task with DIR command based on the configuration")]
+ [DeploymentItem(@"ConfigFiles")]
+ public void CanInjectAndExecuteTask()
+ {
+ //To enforce that shell task dll be copied to output directory.
+ ShellTask tmpTask = new ShellTask("invalid");
+ Assert.IsNotNull(tmpTask);
+
+ string tmp = Directory.GetCurrentDirectory();
+ Assert.IsNotNull(tmp);
+
+ AvroConfigurationSerializer serializer = new AvroConfigurationSerializer();
+ AvroConfiguration avroConfiguration = serializer.AvroDeseriaizeFromFile("evaluator.conf");
+ Assert.IsNotNull(avroConfiguration);
+
+ ICsConfigurationBuilder cb = TangFactory.GetTang().NewConfigurationBuilder();
+ cb.AddConfiguration(TaskConfiguration.ConfigurationModule
+ .Set(TaskConfiguration.Identifier, "Test_CLRContext_task")
+ .Set(TaskConfiguration.Task, GenericType<ShellTask>.Class)
+ .Build());
+ cb.BindNamedParameter<ShellTask.Command, string>(GenericType<ShellTask.Command>.Class, "dir");
+
+ IConfiguration taskConfiguration = cb.Build();
+
+ string taskConfig = serializer.ToString(taskConfiguration);
+
+ ITask task = null;
+ TaskConfiguration config = new TaskConfiguration(taskConfig);
+ Assert.IsNotNull(config);
+ try
+ {
+ IInjector injector = TangFactory.GetTang().NewInjector(config.TangConfig);
+ task = (ITask)injector.GetInstance(typeof(ITask));
+ }
+ catch (Exception e)
+ {
+ throw new InvalidOperationException("unable to inject task with configuration: " + taskConfig, e);
+ }
+
+ byte[] bytes = task.Call(null);
+ string result = System.Text.Encoding.Default.GetString(bytes);
+
+ //a dir command is executed in the container directory, which includes the file "evaluator.conf"
+ Assert.IsTrue(result.Contains("evaluator.conf"));
+ }
+
+ [TestMethod, Priority(0), TestCategory("Unit")]
+ [Description("Test driver information extacted from Http server")]
+ public void CanExtractDriverInformaiton()
+ {
+ const string InfoString = "{\"remoteId\":\"socket://10.121.136.231:14272\",\"startTime\":\"2014 08 28 10:50:32\",\"services\":[{\"serviceName\":\"NameServer\",\"serviceInfo\":\"10.121.136.231:16663\"}]}";
+ AvroDriverInfo info = AvroJsonSerializer<AvroDriverInfo>.FromString(InfoString);
+ Assert.IsTrue(info.remoteId.Equals("socket://10.121.136.231:14272"));
+ Assert.IsTrue(info.startTime.Equals("2014 08 28 10:50:32"));
+ Assert.IsTrue(new DriverInformation(info.remoteId, info.startTime, info.services).NameServerId.Equals("10.121.136.231:16663"));
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Tests/ReefTests/Functional.Tests/Bridge/TestBridgeClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Tests/ReefTests/Functional.Tests/Bridge/TestBridgeClient.cs b/lang/cs/Tests/ReefTests/Functional.Tests/Bridge/TestBridgeClient.cs
new file mode 100644
index 0000000..f0785f9
--- /dev/null
+++ b/lang/cs/Tests/ReefTests/Functional.Tests/Bridge/TestBridgeClient.cs
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Driver;
+using Org.Apache.Reef.Utilities.Logging;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using System;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.IO;
+
+namespace Org.Apache.Reef.Test
+{
+ [TestClass]
+ public class TestBridgeClient : ReefFunctionalTest
+ {
+ private static readonly Logger LOGGER = Logger.GetLogger(typeof(TestBridgeClient));
+
+ [TestInitialize()]
+ public void TestSetup()
+ {
+ CleanUp();
+ Init();
+ }
+
+ [TestCleanup]
+ public void TestCleanup()
+ {
+ Console.WriteLine("Post test check and clean up");
+ CleanUp();
+ }
+
+ [TestMethod, Priority(1), TestCategory("FunctionalGated")]
+ [Description("Run CLR Bridge on local runtime")]
+ [DeploymentItem(@".")]
+ [Ignore] // This is diabled by default on builds
+ public void CanRunClrBridgeOnYarn()
+ {
+ RunClrBridgeClient(runOnYarn: true);
+ }
+
+ [TestMethod, Priority(1), TestCategory("FunctionalGated")]
+ [Description("Run CLR Bridge on local runtime")]
+ [DeploymentItem(@".")]
+ [Timeout(180 * 1000)]
+ public void CanRunClrBridgeOnLocalRuntime()
+ {
+ IsOnLocalRuntiime = true;
+ RunClrBridgeClient(runOnYarn: false);
+ ValidateSuccessForLocalRuntime(2);
+ }
+
+ private void RunClrBridgeClient(bool runOnYarn)
+ {
+ const string clrBridgeClient = "Org.Apache.Reef.CLRBridgeClient.exe";
+ List<string> arguments = new List<string>();
+ arguments.Add(runOnYarn.ToString());
+ arguments.Add(Constants.BridgeLaunchClass);
+ arguments.Add(".");
+ arguments.Add(Path.Combine(_binFolder, Constants.BridgeJarFileName));
+ arguments.Add(Path.Combine(_binFolder, _cmdFile));
+
+ ProcessStartInfo startInfo = new ProcessStartInfo()
+ {
+ FileName = clrBridgeClient,
+ Arguments = string.Join(" ", arguments),
+ RedirectStandardOutput = true,
+ UseShellExecute = false,
+ CreateNoWindow = false
+ };
+
+ LOGGER.Log(Level.Info, "executing\r\n" + startInfo.FileName + "\r\n" + startInfo.Arguments);
+ using (Process process = Process.Start(startInfo))
+ {
+ process.WaitForExit();
+ if (process.ExitCode != 0)
+ {
+ throw new InvalidOperationException("CLR client exited with error code " + process.ExitCode);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Tests/ReefTests/Functional.Tests/Bridge/TestHelloBridgeHandlers.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Tests/ReefTests/Functional.Tests/Bridge/TestHelloBridgeHandlers.cs b/lang/cs/Tests/ReefTests/Functional.Tests/Bridge/TestHelloBridgeHandlers.cs
new file mode 100644
index 0000000..7e896ec
--- /dev/null
+++ b/lang/cs/Tests/ReefTests/Functional.Tests/Bridge/TestHelloBridgeHandlers.cs
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Linq;
+using Org.Apache.Reef.Driver.Bridge;
+using Org.Apache.Reef.Driver.Defaults;
+using Org.Apache.Reef.Examples.HelloCLRBridge;
+using Org.Apache.Reef.Examples.HelloCLRBridge.Handlers;
+using Org.Apache.Reef.Tasks;
+using Org.Apache.Reef.Utilities.Logging;
+using Org.Apache.Reef.Tang.Interface;
+using Org.Apache.Reef.Tang.Util;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+
+namespace Org.Apache.Reef.Test
+{
+ [TestClass]
+ public class TestHelloBridgeHandlers : ReefFunctionalTest
+ {
+ [TestInitialize()]
+ public void TestSetup()
+ {
+ CleanUp();
+ Init();
+ }
+
+ [TestCleanup]
+ public void TestCleanup()
+ {
+ Console.WriteLine("Post test check and clean up");
+ CleanUp();
+ }
+
+ [TestMethod, Priority(1), TestCategory("FunctionalGated")]
+ [Description("Test Hello Handler on local runtime")]
+ [DeploymentItem(@".")]
+ [Timeout(180 * 1000)]
+ public void RunHelloHandlerOnLocalRuntime()
+ {
+ IsOnLocalRuntiime = true;
+ TestRun(AssembliesToCopy(), DriverConfiguration());
+ ValidateSuccessForLocalRuntime(2);
+ ValidateEvaluatorSetting();
+ }
+
+ public IConfiguration DriverConfiguration()
+ {
+ return DriverBridgeConfiguration.ConfigurationModule
+ .Set(DriverBridgeConfiguration.OnDriverStarted, GenericType<HelloStartHandler>.Class)
+ .Set(DriverBridgeConfiguration.OnEvaluatorAllocated, GenericType<HelloAllocatedEvaluatorHandler>.Class)
+ .Set(DriverBridgeConfiguration.OnEvaluatorAllocated, GenericType<AnotherHelloAllocatedEvaluatorHandler>.Class)
+ .Set(DriverBridgeConfiguration.OnContextActive, GenericType<HelloActiveContextHandler>.Class)
+ .Set(DriverBridgeConfiguration.OnTaskMessage, GenericType<HelloTaskMessageHandler>.Class)
+ .Set(DriverBridgeConfiguration.OnEvaluatorFailed, GenericType<HelloFailedEvaluatorHandler>.Class)
+ .Set(DriverBridgeConfiguration.OnTaskFailed, GenericType<HelloFailedTaskHandler>.Class)
+ .Set(DriverBridgeConfiguration.OnTaskRunning, GenericType<HelloRunningTaskHandler>.Class)
+ .Set(DriverBridgeConfiguration.OnEvaluatorRequested, GenericType<HelloEvaluatorRequestorHandler>.Class)
+ .Set(DriverBridgeConfiguration.OnHttpEvent, GenericType<HelloHttpHandler>.Class)
+ .Set(DriverBridgeConfiguration.OnEvaluatorCompleted, GenericType<HelloCompletedEvaluatorHandler>.Class)
+ .Set(DriverBridgeConfiguration.CustomTraceListeners, GenericType<DefaultCustomTraceListener>.Class)
+ .Set(DriverBridgeConfiguration.CustomTraceLevel, Level.Info.ToString())
+ .Set(DriverBridgeConfiguration.CommandLineArguments, "submitContextAndTask")
+ .Build();
+ }
+
+ public HashSet<string> AssembliesToCopy()
+ {
+ HashSet<string> appDlls = new HashSet<string>();
+ appDlls.Add(typeof(HelloStartHandler).Assembly.GetName().Name);
+ appDlls.Add(typeof(HelloTask).Assembly.GetName().Name);
+ return appDlls;
+ }
+
+ private void ValidateEvaluatorSetting()
+ {
+ const string successIndication = "Evaluator is assigned with 512 MB of memory and 2 cores.";
+ string[] lines = File.ReadAllLines(GetLogFile(_stdout));
+ string[] successIndicators = lines.Where(s => s.Contains(successIndication)).ToArray();
+ Assert.IsTrue(successIndicators.Count() >= 1);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Tests/ReefTests/Functional.Tests/Bridge/TestSimpleEventHandlers.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Tests/ReefTests/Functional.Tests/Bridge/TestSimpleEventHandlers.cs b/lang/cs/Tests/ReefTests/Functional.Tests/Bridge/TestSimpleEventHandlers.cs
new file mode 100644
index 0000000..fb8a011
--- /dev/null
+++ b/lang/cs/Tests/ReefTests/Functional.Tests/Bridge/TestSimpleEventHandlers.cs
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Linq;
+using Org.Apache.Reef.Common.Evaluator;
+using Org.Apache.Reef.Driver.Bridge;
+using Org.Apache.Reef.Driver.Defaults;
+using Org.Apache.Reef.Examples.HelloCLRBridge;
+using Org.Apache.Reef.Examples.HelloCLRBridge.handlers;
+using Org.Apache.Reef.Examples.HelloCLRBridge.Handlers;
+using Org.Apache.Reef.IO.Network.Naming;
+using Org.Apache.Reef.Tasks;
+using Org.Apache.Reef.Utilities.Logging;
+using Org.Apache.Reef.Tang.Interface;
+using Org.Apache.Reef.Tang.Util;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+
+namespace Org.Apache.Reef.Test
+{
+ [TestClass]
+ public class TestSimpleEventHandlers : ReefFunctionalTest
+ {
+ [TestInitialize()]
+ public void TestSetup()
+ {
+ CleanUp();
+ Init();
+ }
+
+ [TestCleanup]
+ public void TestCleanup()
+ {
+ Console.WriteLine("Post test check and clean up");
+ CleanUp();
+ }
+
+ //[TestMethod, Priority(1), TestCategory("FunctionalGated")]
+ [Description("Test Hello Handler on local runtime")]
+ [DeploymentItem(@".")]
+ [Timeout(180 * 1000)]
+ public void RunSimpleEventHandlerOnLocalRuntime()
+ {
+ IsOnLocalRuntiime = true;
+ TestRun(AssembliesToCopy(), DriverConfiguration());
+ ValidateSuccessForLocalRuntime(2);
+ ValidateEvaluatorSetting();
+ }
+
+ public IConfiguration DriverConfiguration()
+ {
+ return DriverBridgeConfiguration.ConfigurationModule
+ .Set(DriverBridgeConfiguration.OnDriverStarted, GenericType<HelloSimpleEventHandlers>.Class)
+ .Set(DriverBridgeConfiguration.OnEvaluatorAllocated, GenericType<HelloSimpleEventHandlers>.Class)
+ .Set(DriverBridgeConfiguration.OnEvaluatorAllocated, GenericType<AnotherHelloAllocatedEvaluatorHandler>.Class)
+ .Set(DriverBridgeConfiguration.OnContextActive, GenericType<HelloSimpleEventHandlers>.Class)
+ .Set(DriverBridgeConfiguration.OnTaskMessage, GenericType<HelloTaskMessageHandler>.Class)
+ .Set(DriverBridgeConfiguration.OnEvaluatorFailed, GenericType<HelloSimpleEventHandlers>.Class)
+ .Set(DriverBridgeConfiguration.OnTaskCompleted, GenericType<HelloSimpleEventHandlers>.Class)
+ .Set(DriverBridgeConfiguration.OnTaskFailed, GenericType<HelloSimpleEventHandlers>.Class)
+ .Set(DriverBridgeConfiguration.OnTaskRunning, GenericType<HelloSimpleEventHandlers>.Class)
+ .Set(DriverBridgeConfiguration.OnEvaluatorRequested, GenericType<HelloSimpleEventHandlers>.Class)
+ .Set(DriverBridgeConfiguration.OnHttpEvent, GenericType<HelloSimpleEventHandlers>.Class)
+ .Set(DriverBridgeConfiguration.OnEvaluatorCompleted, GenericType<HelloSimpleEventHandlers>.Class)
+ .Set(DriverBridgeConfiguration.CustomTraceListeners, GenericType<DefaultCustomTraceListener>.Class)
+ .Set(DriverBridgeConfiguration.CustomTraceLevel, Level.Info.ToString())
+ .Set(DriverBridgeConfiguration.CommandLineArguments, "submitContextAndTask")
+ .Set(DriverBridgeConfiguration.OnDriverRestarted, GenericType<HelloRestartHandler>.Class)
+ .Set(DriverBridgeConfiguration.OnDriverReconnect, GenericType<DefaultLocalHttpDriverConnection>.Class)
+ .Set(DriverBridgeConfiguration.OnDirverRestartContextActive, GenericType<HelloDriverRestartActiveContextHandler>.Class)
+ .Set(DriverBridgeConfiguration.OnDriverRestartTaskRunning, GenericType<HelloDriverRestartRunningTaskHandler>.Class)
+ .Build();
+ }
+
+ public HashSet<string> AssembliesToCopy()
+ {
+ HashSet<string> appDlls = new HashSet<string>();
+ appDlls.Add(typeof(HelloSimpleEventHandlers).Assembly.GetName().Name);
+ appDlls.Add(typeof(HelloTask).Assembly.GetName().Name);
+ appDlls.Add(typeof(INameServer).Assembly.GetName().Name);
+ return appDlls;
+ }
+
+ private void ValidateEvaluatorSetting()
+ {
+ const string successIndication = "Evaluator is assigned with 512 MB of memory and 2 cores.";
+ string[] lines = File.ReadAllLines(GetLogFile(_stdout));
+ string[] successIndicators = lines.Where(s => s.Contains(successIndication)).ToArray();
+ Assert.IsTrue(successIndicators.Count() >= 1);
+ }
+ }
+}
\ No newline at end of file