You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/01/29 21:42:54 UTC
[10/31] incubator-reef git commit: [REEF-97] Add the REEF.NET code
base
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Protobuf/WakeRemoteProtosGen.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Protobuf/WakeRemoteProtosGen.cs b/lang/cs/Source/WAKE/Wake/Protobuf/WakeRemoteProtosGen.cs
new file mode 100644
index 0000000..e4b3f2b
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Protobuf/WakeRemoteProtosGen.cs
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+//------------------------------------------------------------------------------
+// <auto-generated>
+// This code was generated by a tool.
+//
+// Changes to this file may cause incorrect behavior and will be lost if
+// the code is regenerated.
+// </auto-generated>
+//------------------------------------------------------------------------------
+
+// Generated from: src/main/proto/RemoteProtocol.proto
+namespace Org.Apache.Reef.Wake.Remote.Proto.WakeRemoteProtos
+{
+ [global::System.Serializable, global::ProtoBuf.ProtoContract(Name=@"WakeMessagePBuf")]
+ public partial class WakeMessagePBuf : global::ProtoBuf.IExtensible
+ {
+ public WakeMessagePBuf() {}
+
+ private byte[] _data;
+ [global::ProtoBuf.ProtoMember(1, IsRequired = true, Name=@"data", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ public byte[] data
+ {
+ get { return _data; }
+ set { _data = value; }
+ }
+ private long _seq;
+ [global::ProtoBuf.ProtoMember(2, IsRequired = true, Name=@"seq", DataFormat = global::ProtoBuf.DataFormat.TwosComplement)]
+ public long seq
+ {
+ get { return _seq; }
+ set { _seq = value; }
+ }
+ private string _source = "";
+ [global::ProtoBuf.ProtoMember(3, IsRequired = false, Name=@"source", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ [global::System.ComponentModel.DefaultValue("")]
+ public string source
+ {
+ get { return _source; }
+ set { _source = value; }
+ }
+ private string _sink = "";
+ [global::ProtoBuf.ProtoMember(4, IsRequired = false, Name=@"sink", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ [global::System.ComponentModel.DefaultValue("")]
+ public string sink
+ {
+ get { return _sink; }
+ set { _sink = value; }
+ }
+ private global::ProtoBuf.IExtension extensionObject;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ { return global::ProtoBuf.Extensible.GetExtensionObject(ref extensionObject, createIfMissing); }
+ }
+
+ [global::System.Serializable, global::ProtoBuf.ProtoContract(Name=@"WakeTuplePBuf")]
+ public partial class WakeTuplePBuf : global::ProtoBuf.IExtensible
+ {
+ public WakeTuplePBuf() {}
+
+ private string _className;
+ [global::ProtoBuf.ProtoMember(1, IsRequired = true, Name=@"className", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ public string className
+ {
+ get { return _className; }
+ set { _className = value; }
+ }
+ private byte[] _data;
+ [global::ProtoBuf.ProtoMember(2, IsRequired = true, Name=@"data", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ public byte[] data
+ {
+ get { return _data; }
+ set { _data = value; }
+ }
+ private global::ProtoBuf.IExtension extensionObject;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ { return global::ProtoBuf.Extensible.GetExtensionObject(ref extensionObject, createIfMissing); }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/RX/AbstractObserver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/RX/AbstractObserver.cs b/lang/cs/Source/WAKE/Wake/RX/AbstractObserver.cs
new file mode 100644
index 0000000..b452a24
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/RX/AbstractObserver.cs
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Utilities.Logging;
+using System;
+
+namespace Org.Apache.Reef.Wake.RX
+{
+ /// <summary>
+ /// An observer with logging-only onError and onCompleted() methods.
+ /// </summary>
+ /// <typeparam name="T">The observer type</typeparam>
+ public abstract class AbstractObserver<T> : IObserver<T>
+ {
+ private static readonly Logger LOGGER = Logger.GetLogger(typeof(AbstractObserver<T>));
+
+ public virtual void OnError(Exception error)
+ {
+ LOGGER.Log(Level.Info, "The observer " + GetType() + "has received an Exception: " + error);
+ }
+
+ public virtual void OnCompleted()
+ {
+ LOGGER.Log(Level.Verbose, "The observer " + GetType() + "has received an onCompleted() ");
+ }
+
+ public abstract void OnNext(T arg1);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/RX/AbstractRxStage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/RX/AbstractRxStage.cs b/lang/cs/Source/WAKE/Wake/RX/AbstractRxStage.cs
new file mode 100644
index 0000000..20c7431
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/RX/AbstractRxStage.cs
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+
+namespace Org.Apache.Reef.Wake.RX
+{
+ /// <summary>
+ /// An Rx stage that implements metering
+ /// </summary>
+ public abstract class AbstractRxStage<T> : IRxStage<T>
+ {
+ //protected internal readonly Meter meter;
+
+ /// <summary>Constructs an abstact rxstage</summary>
+ /// <param name="meterName">the name of the meter</param>
+ public AbstractRxStage(string meterName)
+ {
+ //meter = new Meter(meterName);
+ }
+
+ /// <summary>Updates the meter</summary>
+ /// <param name="value">the event</param>
+ public virtual void OnNext(T value)
+ {
+ //meter.Mark(1);
+ }
+
+ public abstract void OnCompleted();
+
+ public abstract void OnError(Exception error);
+
+ public virtual void Dispose()
+ {
+ // no op
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/RX/IRxStage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/RX/IRxStage.cs b/lang/cs/Source/WAKE/Wake/RX/IRxStage.cs
new file mode 100644
index 0000000..e756328
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/RX/IRxStage.cs
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+
+namespace Org.Apache.Reef.Wake.RX
+{
+ /// <summary>Stage that executes the observer</summary>
+ public interface IRxStage<T> : IObserver<T>, IStage
+ {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/RX/IStaticObservable.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/RX/IStaticObservable.cs b/lang/cs/Source/WAKE/Wake/RX/IStaticObservable.cs
new file mode 100644
index 0000000..577db4d
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/RX/IStaticObservable.cs
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+
+namespace Org.Apache.Reef.Wake.RX
+{
+ public interface IStaticObservable
+ {
+ //intentionally empty
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/RX/ISubject.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/RX/ISubject.cs b/lang/cs/Source/WAKE/Wake/RX/ISubject.cs
new file mode 100644
index 0000000..3679470
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/RX/ISubject.cs
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+
+namespace Org.Apache.Reef.Wake.RX
+{
+ /// <summary>A class implementing Observer> and StaticObservable</summary>
+ /// <typeparam name="In">The in type</typeparam>
+ /// <typeparam name="Out">The out type</typeparam>
+ public interface ISubject<In, Out> : IObserver<In>, IStaticObservable
+ {
+ // intentionally empty
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/RX/Impl/PubSubSubject.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/RX/Impl/PubSubSubject.cs b/lang/cs/Source/WAKE/Wake/RX/Impl/PubSubSubject.cs
new file mode 100644
index 0000000..d513020
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/RX/Impl/PubSubSubject.cs
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Reactive.Subjects;
+using System.Reflection;
+using System.Text;
+using Org.Apache.Reef.Utilities.Diagnostics;
+using Org.Apache.Reef.Utilities.Logging;
+
+namespace Org.Apache.Reef.Wake.RX.Impl
+{
+ /// <summary>
+ /// Subject to provide publish/subscribe interface.
+ /// Subscribes to class Types and invokes handlers for a given
+ /// type on call to OnNext
+ /// </summary>
+ /// <typeparam name="T">The super type that all event types
+ /// inherit from</typeparam>
+ public class PubSubSubject<T> : IObserver<T>
+ {
+ private static readonly Logger LOGGER = Logger.GetLogger(typeof(PubSubSubject<T>));
+
+ private Dictionary<Type, List<object>> _classToObserversMap;
+ private bool _completed;
+ private object _mutex;
+
+ /// <summary>
+ /// Constructs a pub-sub Subject
+ /// </summary>
+ public PubSubSubject()
+ {
+ _classToObserversMap = new Dictionary<Type, List<object>>();
+ _mutex = new object();
+ }
+
+ /// <summary>
+ /// Log on completion
+ /// </summary>
+ public void OnCompleted()
+ {
+ lock (_mutex)
+ {
+ _completed = true;
+ }
+ }
+
+ /// <summary>
+ /// Log Exception
+ /// </summary>
+ /// <param name="error"></param>
+ public void OnError(Exception error)
+ {
+ lock (_mutex)
+ {
+ _completed = true;
+ }
+ }
+
+ /// <summary>
+ /// Invoke the subscribed handlers for the event class type
+ /// </summary>
+ /// <param name="value">The event to process</param>
+ public void OnNext(T value)
+ {
+ if (value == null)
+ {
+ Exceptions.Throw(new ArgumentNullException("value"), LOGGER);
+ }
+
+ lock (_mutex)
+ {
+ // If OnCompleted or OnError called, do nothing
+ if (_completed)
+ {
+ return;
+ }
+
+ // Check that the event type has been subscribed
+ List<object> handlers;
+ if (!_classToObserversMap.TryGetValue(value.GetType(), out handlers))
+ {
+ Exceptions.Throw(new ArgumentException("No event for type " + value.GetType()), LOGGER);
+ }
+
+ // Invoke each IObserver for the event type
+ foreach (object handler in handlers)
+ {
+ Type handlerType = typeof(IObserver<>).MakeGenericType(new[] { value.GetType() });
+ MethodInfo info = handlerType.GetMethod("OnNext");
+ info.Invoke(handler, new[] { (object) value });
+ }
+ }
+ }
+
+ /// <summary>
+ /// Subscribe an IObserver for an event type
+ /// </summary>
+ /// <typeparam name="U">The event type</typeparam>
+ /// <param name="observer">The observer to handle the event</param>
+ /// <returns>An IDisposable object used to handle unsubscribing
+ /// the IObserver</returns>
+ public IDisposable Subscribe<U>(IObserver<U> observer) where U : T
+ {
+ lock (_mutex)
+ {
+ List<object> observers;
+ if (!_classToObserversMap.TryGetValue(typeof(U), out observers))
+ {
+ observers = new List<object>();
+ _classToObserversMap[typeof(U)] = observers;
+ }
+ observers.Add(observer);
+ }
+
+ return new DisposableResource<U>(_classToObserversMap, observer, _mutex);
+ }
+
+ /// <summary>
+ /// Utility class to handle disposing of an IObserver
+ /// </summary>
+ private class DisposableResource<U> : IDisposable
+ {
+ private Dictionary<Type, List<object>> _observersMap;
+ private IObserver<U> _observer;
+ private object _mutex;
+ private bool _disposed;
+
+ public DisposableResource(Dictionary<Type, List<object>> observersMap, IObserver<U> observer, object mutex)
+ {
+ _observersMap = observersMap;
+ _observer = observer;
+ _mutex = mutex;
+ _disposed = false;
+ }
+
+ /// <summary>
+ /// Unsubscribe the IObserver from the observer map
+ /// </summary>
+ public void Dispose()
+ {
+ if (!_disposed)
+ {
+ UnsubscribeObserver();
+ _disposed = true;
+ }
+ }
+
+ private void UnsubscribeObserver()
+ {
+ lock (_mutex)
+ {
+ List<object> observers;
+ if (_observersMap.TryGetValue(typeof(U), out observers))
+ {
+ observers.Remove(_observer);
+ }
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/RX/Impl/RxSyncStage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/RX/Impl/RxSyncStage.cs b/lang/cs/Source/WAKE/Wake/RX/Impl/RxSyncStage.cs
new file mode 100644
index 0000000..4803f89
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/RX/Impl/RxSyncStage.cs
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+
+namespace Org.Apache.Reef.Wake.RX.Impl
+{
+ /// <summary>Stage that executes the observer synchronously</summary>
+ public class RxSyncStage<T> : AbstractRxStage<T>
+ {
+ private readonly IObserver<T> _observer;
+
+ /// <summary>Constructs a Rx synchronous stage</summary>
+ /// <param name="observer">the observer</param>
+ public RxSyncStage(IObserver<T> observer) : base(observer.GetType().FullName)
+ {
+ _observer = observer;
+ }
+
+ /// <summary>Provides the observer with the new value</summary>
+ /// <param name="value">the new value</param>
+ public override void OnNext(T value)
+ {
+ base.OnNext(value);
+ _observer.OnNext(value);
+ }
+
+ /// <summary>
+ /// Notifies the observer that the provider has experienced an error
+ /// condition.
+ /// </summary>
+ /// <param name="error">the error</param>
+ public override void OnError(Exception error)
+ {
+ _observer.OnError(error);
+ }
+
+ /// <summary>
+ /// Notifies the observer that the provider has finished sending push-based
+ /// notifications.
+ /// </summary>
+ public override void OnCompleted()
+ {
+ _observer.OnCompleted();
+ }
+
+ /// <summary>
+ /// Closes the stage
+ /// </summary>
+ public override void Dispose()
+ {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/RX/Impl/RxThreadPoolStage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/RX/Impl/RxThreadPoolStage.cs b/lang/cs/Source/WAKE/Wake/RX/Impl/RxThreadPoolStage.cs
new file mode 100644
index 0000000..4986055
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/RX/Impl/RxThreadPoolStage.cs
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Utilities.Diagnostics;
+using Org.Apache.Reef.Utilities.Logging;
+using Org.Apache.Reef.Wake.Util;
+using System;
+
+namespace Org.Apache.Reef.Wake.RX.Impl
+{
+ /// <summary>Stage that executes the observer with a thread pool</summary>
+ public class RxThreadPoolStage<T> : AbstractRxStage<T>
+ {
+ private static readonly Logger LOGGER = Logger.GetLogger(typeof(RxThreadPoolStage<T>));
+
+ private readonly IObserver<T> _observer;
+
+ private readonly ITaskService _taskService;
+
+ /// <summary>Constructs a Rx thread pool stage</summary>
+ /// <param name="observer">the observer to execute</param>
+ /// <param name="numThreads">the number of threads</param>
+ public RxThreadPoolStage(IObserver<T> observer, int numThreads)
+ : base(observer.GetType().FullName)
+ {
+ _observer = observer;
+ if (numThreads <= 0)
+ {
+ Exceptions.Throw(new WakeRuntimeException("numThreads " + numThreads + " is less than or equal to 0"), LOGGER);
+ }
+ _taskService = new FixedThreadPoolTaskService(numThreads);
+ }
+
+ /// <summary>Provides the observer with the new value</summary>
+ /// <param name="value">the new value</param>
+ public override void OnNext(T value)
+ {
+ base.OnNext(value);
+ _taskService.Execute(new _Startable_58(this, value).Start);
+ }
+
+ /// <summary>
+ /// Notifies the observer that the provider has experienced an error
+ /// condition.
+ /// </summary>
+ /// <param name="error">the error</param>
+ public override void OnError(Exception error)
+ {
+ _taskService.Execute(new _Startable_75(this, error).Start);
+ }
+
+ /// <summary>
+ /// Notifies the observer that the provider has finished sending push-based
+ /// notifications.
+ /// </summary>
+ public override void OnCompleted()
+ {
+ _taskService.Execute(new _Startable_91(this).Start);
+ }
+
+ /// <summary>
+ /// Closes the stage
+ /// </summary>
+ public override void Dispose()
+ {
+ _taskService.Shutdown();
+ }
+
+ private sealed class _Startable_58 : IStartable
+ {
+ private readonly RxThreadPoolStage<T> _enclosing;
+ private readonly T _value;
+
+ public _Startable_58(RxThreadPoolStage<T> enclosing, T value)
+ {
+ _enclosing = enclosing;
+ _value = value;
+ }
+
+ public void Start()
+ {
+ _enclosing._observer.OnNext(_value);
+ }
+ }
+
+ private sealed class _Startable_75 : IStartable
+ {
+ private readonly RxThreadPoolStage<T> _enclosing;
+ private readonly Exception _error;
+
+ public _Startable_75(RxThreadPoolStage<T> enclosing, Exception error)
+ {
+ _enclosing = enclosing;
+ _error = error;
+ }
+
+ public void Start()
+ {
+ _enclosing._observer.OnError(_error);
+ }
+ }
+
+ private sealed class _Startable_91 : IStartable
+ {
+ private readonly RxThreadPoolStage<T> _enclosing;
+
+ public _Startable_91(RxThreadPoolStage<T> enclosing)
+ {
+ _enclosing = enclosing;
+ }
+
+ public void Start()
+ {
+ _enclosing._observer.OnCompleted();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/RX/Impl/RxTimerStage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/RX/Impl/RxTimerStage.cs b/lang/cs/Source/WAKE/Wake/RX/Impl/RxTimerStage.cs
new file mode 100644
index 0000000..44dd77c
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/RX/Impl/RxTimerStage.cs
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Timers;
+
+using Org.Apache.Reef.Wake.Impl;
+
+namespace Org.Apache.Reef.Wake.RX.Impl
+{
+ /// <summary>Timer stage that provides events to the observer periodically</summary>
+ public class RxTimerStage : IStage, IStaticObservable
+ {
+ private readonly Timer _timer;
+ private readonly PeriodicEvent _value = new PeriodicEvent();
+ private readonly IObserver<PeriodicEvent> _observer;
+
+ /// <summary>Constructs a Rx timer stage</summary>
+ /// <param name="observer">the observer</param>
+ /// <param name="period">the period in milli-seconds</param>
+ public RxTimerStage(IObserver<PeriodicEvent> observer, long period)
+ : this(observer, 0, period)
+ {
+ }
+
+ /// <summary>Constructs a Rx timer stage</summary>
+ /// <param name="observer">the observer</param>
+ /// <param name="initialDelay">the initial delay in milli-seconds</param>
+ /// <param name="period">the period in milli-seconds</param>
+ public RxTimerStage(IObserver<PeriodicEvent> observer, long initialDelay, long period)
+ {
+ _observer = observer;
+ _timer = new Timer(period);
+ _timer.Elapsed += (sender, e) => OnTimedEvent(sender, e, _observer, _value);
+ _timer.Enabled = true;
+ }
+
+ /// <summary>
+ /// Closes the stage
+ /// </summary>
+ public void Dispose()
+ {
+ _timer.Stop();
+ }
+
+ private static void OnTimedEvent(object source, ElapsedEventArgs e, IObserver<PeriodicEvent> observer, PeriodicEvent value)
+ {
+ observer.OnNext(value);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/RX/Impl/SimpleSubject.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/RX/Impl/SimpleSubject.cs b/lang/cs/Source/WAKE/Wake/RX/Impl/SimpleSubject.cs
new file mode 100644
index 0000000..418dc98
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/RX/Impl/SimpleSubject.cs
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+
+namespace Org.Apache.Reef.Wake.RX.Impl
+{
+ /// <summary>A Subject that relays all messages to its subscribers.</summary>
+ public class SimpleSubject<T> : ISubject<T, T>
+ {
+ private readonly IObserver<T> _observer;
+
+ /// <summary>Constructs a simple subject</summary>
+ /// <param name="observer">the observer</param>
+ public SimpleSubject(IObserver<T> observer)
+ {
+ _observer = observer;
+ }
+
+ /// <summary>Provides the observer with the new value</summary>
+ /// <param name="value">the new value</param>
+ public virtual void OnNext(T value)
+ {
+ _observer.OnNext(value);
+ }
+
+ /// <summary>Provides the observer with the error</summary>
+ /// <param name="error">the error</param>
+ public virtual void OnError(Exception error)
+ {
+ _observer.OnError(error);
+ }
+
+ /// <summary>
+ /// Provides the observer with it has finished sending push-based
+ /// notifications.
+ /// </summary>
+ public virtual void OnCompleted()
+ {
+ _observer.OnCompleted();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/RX/ObserverCompletedException.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/RX/ObserverCompletedException.cs b/lang/cs/Source/WAKE/Wake/RX/ObserverCompletedException.cs
new file mode 100644
index 0000000..b562055
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/RX/ObserverCompletedException.cs
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+
+namespace Org.Apache.Reef.Wake.RX
+{
+ /// <summary>
+ /// It is illegal to call onError() or onCompleted() when a call to onNext() is
+ /// still outstanding, or to call onNext(), onError() or onCompleted() after a
+ /// call to onError() or onCompleted() has been dispatched.
+ /// </summary>
+ /// <remarks>
+ /// It is illegal to call onError() or onCompleted() when a call to onNext() is
+ /// still outstanding, or to call onNext(), onError() or onCompleted() after a
+ /// call to onError() or onCompleted() has been dispatched. Observers may throw
+ /// an ObserverCompleted exception whenever this API is violated. Violating the
+ /// API leaves the Observer (and any resources that it holds) in an undefined
+ /// state, and throwing ObserverCompleted exceptions is optional.
+ /// Callers receiving this exception should simply pass it up the stack to the
+ /// Aura runtime. They should not attempt to forward it on to upstream or
+ /// downstream stages. The easiest way to do this is to ignore the exception
+ /// entirely.
+ /// </remarks>
+ [System.Serializable]
+ public class ObserverCompletedException : InvalidOperationException
+ {
+ private const long serialVersionUID = 1L;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/ICodec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/ICodec.cs b/lang/cs/Source/WAKE/Wake/Remote/ICodec.cs
new file mode 100644
index 0000000..25f0bce
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/ICodec.cs
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.Reef.Wake.Remote
+{
+ public interface ICodec
+ {
+ }
+
+ /// <summary>
+ /// Interface for serialization routines that translate back and forth between
+ /// byte arrays with low latency.
+ /// </summary>
+ /// <typeparam name="T">The codec type</typeparam>
+ public interface ICodec<T> : ICodec, IEncoder<T>, IDecoder<T>
+ {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/ICodecFactory.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/ICodecFactory.cs b/lang/cs/Source/WAKE/Wake/Remote/ICodecFactory.cs
new file mode 100644
index 0000000..6ba2805
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/ICodecFactory.cs
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Tang.Annotations;
+using Org.Apache.Reef.Wake.Remote.Impl;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Org.Apache.Reef.Wake.Remote
+{
+ [DefaultImplementation(typeof(ByteCodecFactory))]
+ public interface ICodecFactory
+ {
+ object Create();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/IDecoder.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/IDecoder.cs b/lang/cs/Source/WAKE/Wake/Remote/IDecoder.cs
new file mode 100644
index 0000000..ddc72c8
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/IDecoder.cs
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.Reef.Wake.Remote
+{
+ public interface IDecoder
+ {
+ }
+
+ /// <summary>
+ /// Interface for serialization routines that translate back and forth between
+ /// byte arrays with low latency.
+ /// </summary>
+ /// <typeparam name="T">The decoder type</typeparam>
+ public interface IDecoder<T> : IDecoder
+ {
+ /// <summary>Decodes the given byte array into an object</summary>
+ /// <param name="data"></param>
+ /// <returns>the decoded object</returns>
+ T Decode(byte[] data);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/IEncoder.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/IEncoder.cs b/lang/cs/Source/WAKE/Wake/Remote/IEncoder.cs
new file mode 100644
index 0000000..b2a743f
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/IEncoder.cs
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.Reef.Wake.Remote
+{
+ public interface IEncoder
+ {
+ }
+
+ /// <summary>
+ /// Interface for serialization routines that translate back and forth between
+ /// byte arrays with low latency.
+ /// </summary>
+ /// <typeparam name="T">The encoder type</typeparam>
+ public interface IEncoder<T> : IEncoder
+ {
+ /// <summary>Encodes the given object into a Byte Array</summary>
+ /// <param name="obj"></param>
+ /// <returns>a byte[] representation of the object</returns>
+ byte[] Encode(T obj);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/ILink.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/ILink.cs b/lang/cs/Source/WAKE/Wake/Remote/ILink.cs
new file mode 100644
index 0000000..b25ef4f
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/ILink.cs
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Net;
+using System.Net.Sockets;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using Org.Apache.Reef.Wake.Remote.Impl;
+
+namespace Org.Apache.Reef.Wake.Remote
+{
+ /// <summary>
+ /// Represents a link between two endpoints
+ /// </summary>
+ public interface ILink<T> : IDisposable
+ {
+ /// <summary>
+ /// Returns the local socket address
+ /// </summary>
+ IPEndPoint LocalEndpoint { get; }
+
+ /// <summary>
+ /// Returns the remote socket address
+ /// </summary>
+ IPEndPoint RemoteEndpoint { get; }
+
+ /// <summary>
+ /// Writes the value to this link asynchronously
+ /// </summary>
+ /// <param name="value">The data to write</param>
+ /// <param name="token">The cancellation token</param>
+ Task WriteAsync(T value, CancellationToken token);
+
+ /// <summary>
+ /// Writes the value to this link synchronously
+ /// </summary>
+ /// <param name="value">The data to write</param>
+ void Write(T value);
+
+ /// <summary>
+ /// Reads the value from this link asynchronously
+ /// </summary>
+ /// <returns>The read data</returns>
+ /// <param name="token">The cancellation token</param>
+ Task<T> ReadAsync(CancellationToken token);
+
+ /// <summary>
+ /// Reads the value from this link synchronously
+ /// </summary>
+ /// <returns>The read data</returns>
+ T Read();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/IRemoteEvent.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/IRemoteEvent.cs b/lang/cs/Source/WAKE/Wake/Remote/IRemoteEvent.cs
new file mode 100644
index 0000000..d693401
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/IRemoteEvent.cs
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Net;
+using System.Text;
+
+namespace Org.Apache.Reef.Wake.Remote
+{
+ public interface IRemoteEvent<T>
+ {
+ IPEndPoint LocalEndPoint { get; set; }
+
+ IPEndPoint RemoteEndPoint { get; set; }
+
+ string Source { get; }
+
+ string Sink { get; }
+
+ T Value { get; }
+
+ long Sequence { get; }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/IRemoteIdentifier.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/IRemoteIdentifier.cs b/lang/cs/Source/WAKE/Wake/Remote/IRemoteIdentifier.cs
new file mode 100644
index 0000000..1101774
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/IRemoteIdentifier.cs
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.Reef.Wake.Remote
+{
+ /// <summary>
+ /// An identifier that represents a remote source
+ /// </summary>
+ public abstract class IRemoteIdentifier : IIdentifier
+ {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/IRemoteIdentifierFactory.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/IRemoteIdentifierFactory.cs b/lang/cs/Source/WAKE/Wake/Remote/IRemoteIdentifierFactory.cs
new file mode 100644
index 0000000..fdea1e4
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/IRemoteIdentifierFactory.cs
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.Reef.Wake.Remote
+{
+ /// <summary>Factory that creates a RemoteIdentifier</summary>
+ public interface IRemoteIdentifierFactory : IIdentifierFactory
+ {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/IRemoteManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/IRemoteManager.cs b/lang/cs/Source/WAKE/Wake/Remote/IRemoteManager.cs
new file mode 100644
index 0000000..a572b04
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/IRemoteManager.cs
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Net;
+using Org.Apache.Reef.Wake.Remote.Impl;
+
+namespace Org.Apache.Reef.Wake.Remote
+{
+ public interface IRemoteManager<T> : IStage
+ {
+ IRemoteIdentifier Identifier { get; }
+
+ IPEndPoint LocalEndpoint { get; }
+
+ IObserver<T> GetRemoteObserver(RemoteEventEndPoint<T> dest);
+
+ IObserver<T> GetRemoteObserver(IPEndPoint remoteEndpoint);
+
+ IDisposable RegisterObserver(RemoteEventEndPoint<T> source, IObserver<T> theObserver);
+
+ IDisposable RegisterObserver(IPEndPoint remoteEndpoint, IObserver<T> theObserver);
+
+ IDisposable RegisterObserver(IObserver<IRemoteMessage<T>> theObserver);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/IRemoteMessage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/IRemoteMessage.cs b/lang/cs/Source/WAKE/Wake/Remote/IRemoteMessage.cs
new file mode 100644
index 0000000..4b3d2a3
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/IRemoteMessage.cs
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.Reef.Wake.Remote
+{
+ /// <summary>
+ /// Message received from a remote handler
+ /// </summary>
+ public interface IRemoteMessage<T>
+ {
+ /// <summary>
+ /// Returns a remote identifier of the sender
+ /// </summary>
+ /// <returns>The remote identifier</returns>
+ IRemoteIdentifier Identifier { get; }
+
+ /// <summary>
+ /// Returns an actual message
+ /// </summary>
+ /// <returns>The remote message</returns>
+ T Message { get; }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/ISubscriptionManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/ISubscriptionManager.cs b/lang/cs/Source/WAKE/Wake/Remote/ISubscriptionManager.cs
new file mode 100644
index 0000000..8d859e2
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/ISubscriptionManager.cs
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.Reef.Wake.Remote
+{
+ public interface ISubscriptionManager
+ {
+ void Unsubscribe(object token);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/Impl/ByteCodec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/ByteCodec.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/ByteCodec.cs
new file mode 100644
index 0000000..e596ab7
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/Impl/ByteCodec.cs
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Tang.Annotations;
+
+namespace Org.Apache.Reef.Wake.Remote.Impl
+{
+ public class ByteCodec : ICodec<byte[]>
+ {
+ [Inject]
+ public ByteCodec()
+ {
+ }
+
+ public byte[] Encode(byte[] obj)
+ {
+ return obj;
+ }
+
+ public byte[] Decode(byte[] data)
+ {
+ return data;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/Impl/ByteCodecFactory.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/ByteCodecFactory.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/ByteCodecFactory.cs
new file mode 100644
index 0000000..333f341
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/Impl/ByteCodecFactory.cs
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Tang.Annotations;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Org.Apache.Reef.Wake.Remote.Impl
+{
+ public class ByteCodecFactory : ICodecFactory
+ {
+ [Inject]
+ public ByteCodecFactory()
+ {
+ }
+
+ public object Create()
+ {
+ return new ByteCodec();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/Impl/Channel.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/Channel.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/Channel.cs
new file mode 100644
index 0000000..184da8a
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/Impl/Channel.cs
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.IO;
+using System.Linq;
+using System.Net.Sockets;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Org.Apache.Reef.Wake.Remote.Impl
+{
+ /// <summary>
+ /// Performs low level network IO operations between hosts
+ /// </summary>
+ public class Channel
+ {
+ private NetworkStream _stream;
+
+ /// <summary>
+ /// Constructs a new Channel with the the connected NetworkStream.
+ /// </summary>
+ /// <param name="stream">The connected stream</param>
+ public Channel(NetworkStream stream)
+ {
+ if (stream == null)
+ {
+ throw new ArgumentNullException("stream");
+ }
+
+ _stream = stream;
+ }
+
+ /// <summary>
+ /// Sends a message to the connected client synchronously
+ /// </summary>
+ /// <param name="message">The message to send</param>
+ public void Write(byte[] message)
+ {
+ if (message == null)
+ {
+ throw new ArgumentNullException("message");
+ }
+
+ byte[] messageBuffer = GenerateMessageBuffer(message);
+ _stream.Write(messageBuffer, 0, messageBuffer.Length);
+ }
+
+ /// <summary>
+ /// Sends a message to the connected client asynchronously
+ /// </summary>
+ /// <param name="message">The message to send</param>
+ /// <param name="token">The cancellation token</param>
+ /// <returns>The awaitable write task</returns>
+ public async Task WriteAsync(byte[] message, CancellationToken token)
+ {
+ byte[] messageBuffer = GenerateMessageBuffer(message);
+ await _stream.WriteAsync(messageBuffer, 0, messageBuffer.Length, token);
+ }
+
+ /// <summary>
+ /// Reads an incoming message as a byte array synchronously.
+ /// The message length is read as the first four bytes.
+ /// </summary>
+ /// <returns>The byte array message</returns>
+ public byte[] Read()
+ {
+ int payloadLength = ReadMessageLength();
+ if (payloadLength == 0)
+ {
+ return null;
+ }
+
+ return ReadBytes(payloadLength);
+ }
+
+ /// <summary>
+ /// Reads an incoming message as a byte array asynchronously.
+ /// The message length is read as the first four bytes.
+ /// </summary>
+ /// <param name="token">The cancellation token</param>
+ /// <returns>The byte array message</returns>
+ public async Task<byte[]> ReadAsync(CancellationToken token)
+ {
+ int payloadLength = await GetMessageLengthAsync(token);
+ if (payloadLength == 0)
+ {
+ return null;
+ }
+
+ return await ReadBytesAsync(payloadLength, token);
+ }
+
+ /// <summary>
+ /// Helper method to read the specified number of bytes from the network stream.
+ /// </summary>
+ /// <param name="bytesToRead">The number of bytes to read</param>
+ /// <returns>The byte[] read from the network stream with the requested
+ /// number of bytes, otherwise null if the operation failed.
+ /// </returns>
+ private byte[] ReadBytes(int bytesToRead)
+ {
+ int totalBytesRead = 0;
+ byte[] buffer = new byte[bytesToRead];
+
+ while (totalBytesRead < bytesToRead)
+ {
+ int bytesRead = _stream.Read(buffer, totalBytesRead, bytesToRead - totalBytesRead);
+ if (bytesRead == 0)
+ {
+ // Read timed out or connection was closed
+ return null;
+ }
+
+ totalBytesRead += bytesRead;
+ }
+
+ return buffer;
+ }
+
+ /// <summary>
+ /// Helper method to read the specified number of bytes from the network stream.
+ /// </summary>
+ /// <param name="bytesToRead">The number of bytes to read</param>
+ /// <param name="token">The cancellation token</param>
+ /// <returns>The byte[] read from the network stream with the requested
+ /// number of bytes, otherwise null if the operation failed.
+ /// </returns>
+ private async Task<byte[]> ReadBytesAsync(int bytesToRead, CancellationToken token)
+ {
+ int bytesRead = 0;
+ byte[] buffer = new byte[bytesToRead];
+
+ while (bytesRead < bytesToRead)
+ {
+ int amountRead = await _stream.ReadAsync(buffer, bytesRead, bytesToRead - bytesRead, token);
+ if (amountRead == 0)
+ {
+ // Read timed out or connection was closed
+ return null;
+ }
+
+ bytesRead += amountRead;
+ }
+
+ return buffer;
+ }
+
+ /// <summary>
+ /// Generates the payload buffer containing the message along
+ /// with a header indicating the message length.
+ /// </summary>
+ /// <param name="message">The message to send</param>
+ /// <returns>The payload buffer</returns>
+ private byte[] GenerateMessageBuffer(byte[] message)
+ {
+ byte[] lengthBuffer1 = BitConverter.GetBytes(message.Length + 4);
+ byte[] lengthBuffer2 = BitConverter.GetBytes(message.Length);
+ if (BitConverter.IsLittleEndian)
+ {
+ Array.Reverse(lengthBuffer1);
+ }
+
+ int len = lengthBuffer1.Length + lengthBuffer2.Length + message.Length;
+ byte[] messageBuffer = new byte[len];
+
+ int bytesCopied = 0;
+ bytesCopied += CopyBytes(lengthBuffer1, messageBuffer, 0);
+ bytesCopied += CopyBytes(lengthBuffer2, messageBuffer, bytesCopied);
+ CopyBytes(message, messageBuffer, bytesCopied);
+
+ return messageBuffer;
+ }
+
+ /// <summary>
+ /// Reads the first four bytes from the stream and decode
+ /// it to get the message length in bytes
+ /// </summary>
+ /// <returns>The incoming message's length in bytes</returns>
+ private int ReadMessageLength()
+ {
+ byte[] lenBytes = ReadBytes(sizeof(int));
+ if (lenBytes == null)
+ {
+ return 0;
+ }
+ if (BitConverter.IsLittleEndian)
+ {
+ Array.Reverse(lenBytes);
+ }
+ if (BitConverter.ToInt32(lenBytes, 0) == 0)
+ {
+ return 0;
+ }
+
+ byte[] msgLength = ReadBytes(sizeof(int));
+ return (msgLength == null) ? 0 : BitConverter.ToInt32(msgLength, 0);
+ }
+
+ /// <summary>
+ /// Reads the first four bytes from the stream and decode
+ /// it to get the message length in bytes
+ /// </summary>
+ /// <param name="token">The cancellation token</param>
+ /// <returns>The incoming message's length in bytes</returns>
+ private async Task<int> GetMessageLengthAsync(CancellationToken token)
+ {
+ byte[] lenBytes = await ReadBytesAsync(sizeof(int), token);
+ if (lenBytes == null)
+ {
+ return 0;
+ }
+ if (BitConverter.IsLittleEndian)
+ {
+ Array.Reverse(lenBytes);
+ }
+ if (BitConverter.ToInt32(lenBytes, 0) == 0)
+ {
+ return 0;
+ }
+
+ byte[] msgLength = ReadBytes(sizeof(int));
+ return (msgLength == null) ? 0 : BitConverter.ToInt32(msgLength, 0);
+ }
+
+ /// <summary>
+ /// Copies the entire source buffer into the destination buffer the specified
+ /// destination offset.
+ /// </summary>
+ /// <param name="source">The source buffer to be copied</param>
+ /// <param name="dest">The destination buffer to copy to</param>
+ /// <param name="destOffset">The offset at the destination buffer to begin
+ /// copying.</param>
+ /// <returns>The number of bytes copied</returns>
+ private int CopyBytes(byte[] source, byte[] dest, int destOffset)
+ {
+ Buffer.BlockCopy(source, 0, dest, destOffset, source.Length);
+ return source.Length;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/Impl/DefaultRemoteManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/DefaultRemoteManager.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/DefaultRemoteManager.cs
new file mode 100644
index 0000000..2bba3c8
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/Impl/DefaultRemoteManager.cs
@@ -0,0 +1,338 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Utilities.Diagnostics;
+using Org.Apache.Reef.Utilities.Logging;
+using Org.Apache.Reef.Wake.Util;
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Linq;
+using System.Net;
+using System.Net.Sockets;
+using System.Reactive;
+using System.Threading.Tasks;
+
+namespace Org.Apache.Reef.Wake.Remote.Impl
+{
+ /// <summary>
+ /// Manages incoming and outgoing messages between remote hosts.
+ /// </summary>
+ public class DefaultRemoteManager<T> : IRemoteManager<T>
+ {
+ private static readonly Logger LOGGER = Logger.GetLogger(typeof(DefaultRemoteManager<T>));
+
+ private ObserverContainer<T> _observerContainer;
+ private TransportServer<IRemoteEvent<T>> _server;
+ private Dictionary<IPEndPoint, ProxyObserver> _cachedClients;
+ private ICodec<IRemoteEvent<T>> _codec;
+
+ /// <summary>
+ /// Constructs a DefaultRemoteManager listening on the specified address and any
+ /// available port.
+ /// </summary>
+ /// <param name="localAddress">The address to listen on</param>
+ /// <param name="codec">The codec used for serializing messages</param>
+ public DefaultRemoteManager(IPAddress localAddress, ICodec<T> codec) : this(localAddress, 0, codec)
+ {
+ }
+
+ /// <summary>
+ /// Constructs a DefaultRemoteManager listening on the specified IPEndPoint.
+ /// </summary>
+ /// <param name="localEndpoint">The endpoint to listen on</param>
+ /// <param name="codec">The codec used for serializing messages</param>
+ public DefaultRemoteManager(IPEndPoint localEndpoint, ICodec<T> codec)
+ {
+ if (localEndpoint == null)
+ {
+ throw new ArgumentNullException("localEndpoint");
+ }
+ if (localEndpoint.Port < 0)
+ {
+ throw new ArgumentException("Listening port must be greater than or equal to zero");
+ }
+ if (codec == null)
+ {
+ throw new ArgumentNullException("codec");
+ }
+
+ _codec = new RemoteEventCodec<T>(codec);
+ _observerContainer = new ObserverContainer<T>();
+ _cachedClients = new Dictionary<IPEndPoint, ProxyObserver>();
+
+ // Begin to listen for incoming messages
+ _server = new TransportServer<IRemoteEvent<T>>(localEndpoint, _observerContainer, _codec);
+ _server.Run();
+
+ LocalEndpoint = _server.LocalEndpoint;
+ Identifier = new SocketRemoteIdentifier(LocalEndpoint);
+ }
+
+ /// <summary>
+ /// Constructs a DefaultRemoteManager listening on the specified address and any
+ /// available port.
+ /// </summary>
+ /// <param name="localAddress">The address to listen on</param>
+ /// <param name="port">The port to listen on</param>
+ /// <param name="codec">The codec used for serializing messages</param>
+ public DefaultRemoteManager(IPAddress localAddress, int port, ICodec<T> codec)
+ {
+ if (localAddress == null)
+ {
+ throw new ArgumentNullException("localAddress");
+ }
+ if (port < 0)
+ {
+ throw new ArgumentException("Listening port must be greater than or equal to zero");
+ }
+ if (codec == null)
+ {
+ throw new ArgumentNullException("codec");
+ }
+
+ _observerContainer = new ObserverContainer<T>();
+ _codec = new RemoteEventCodec<T>(codec);
+ _cachedClients = new Dictionary<IPEndPoint, ProxyObserver>();
+
+ IPEndPoint localEndpoint = new IPEndPoint(localAddress, port);
+
+ // Begin to listen for incoming messages
+ _server = new TransportServer<IRemoteEvent<T>>(localEndpoint, _observerContainer, _codec);
+ _server.Run();
+
+ LocalEndpoint = _server.LocalEndpoint;
+ Identifier = new SocketRemoteIdentifier(LocalEndpoint);
+ }
+
+ /// <summary>
+ /// Constructs a DefaultRemoteManager. Does not listen for incoming messages.
+ /// </summary>
+ /// <param name="codec">The codec used for serializing messages</param>
+ public DefaultRemoteManager(ICodec<T> codec)
+ {
+ using (LOGGER.LogFunction("DefaultRemoteManager::DefaultRemoteManager"))
+ {
+ if (codec == null)
+ {
+ throw new ArgumentNullException("codec");
+ }
+
+ _observerContainer = new ObserverContainer<T>();
+ _codec = new RemoteEventCodec<T>(codec);
+ _cachedClients = new Dictionary<IPEndPoint, ProxyObserver>();
+
+ LocalEndpoint = new IPEndPoint(NetworkUtils.LocalIPAddress, 0);
+ Identifier = new SocketRemoteIdentifier(LocalEndpoint);
+ }
+ }
+
+ /// <summary>
+ /// Gets the RemoteIdentifier for the DefaultRemoteManager
+ /// </summary>
+ public IRemoteIdentifier Identifier { get; private set; }
+
+ /// <summary>
+ /// Gets the local IPEndPoint for the DefaultRemoteManager
+ /// </summary>
+ public IPEndPoint LocalEndpoint { get; private set; }
+
+ /// <summary>
+ /// Returns an IObserver used to send messages to the remote host at
+ /// the specified IPEndpoint.
+ /// </summary>
+ /// <param name="remoteEndpoint">The IPEndpoint of the remote host</param>
+ /// <returns>An IObserver used to send messages to the remote host</returns>
+ public IObserver<T> GetRemoteObserver(RemoteEventEndPoint<T> remoteEndpoint)
+ {
+ if (remoteEndpoint == null)
+ {
+ throw new ArgumentNullException("remoteEndpoint");
+ }
+
+ SocketRemoteIdentifier id = remoteEndpoint.Id as SocketRemoteIdentifier;
+ if (id == null)
+ {
+ throw new ArgumentException("ID not supported");
+ }
+
+ return GetRemoteObserver(id.Addr);
+ }
+
+ /// <summary>
+ /// Returns an IObserver used to send messages to the remote host at
+ /// the specified IPEndpoint.
+ /// </summary>
+ /// <param name="remoteEndpoint">The IPEndpoint of the remote host</param>
+ /// <returns>An IObserver used to send messages to the remote host</returns>
+ public IObserver<T> GetRemoteObserver(IPEndPoint remoteEndpoint)
+ {
+ if (remoteEndpoint == null)
+ {
+ throw new ArgumentNullException("remoteEndpoint");
+ }
+
+ ProxyObserver remoteObserver;
+ if (!_cachedClients.TryGetValue(remoteEndpoint, out remoteObserver))
+ {
+ TransportClient<IRemoteEvent<T>> client =
+ new TransportClient<IRemoteEvent<T>>(remoteEndpoint, _codec, _observerContainer);
+
+ remoteObserver = new ProxyObserver(client);
+ _cachedClients[remoteEndpoint] = remoteObserver;
+ }
+
+ return remoteObserver;
+ }
+
+ /// <summary>
+ /// Registers an IObserver used to handle incoming messages from the remote host
+ /// at the specified IPEndPoint.
+ /// The IDisposable that is returned can be used to unregister the IObserver.
+ /// </summary>
+ /// <param name="remoteEndpoint">The IPEndPoint of the remote host</param>
+ /// <param name="observer">The IObserver to handle incoming messages</param>
+ /// <returns>An IDisposable used to unregister the observer with</returns>
+ public IDisposable RegisterObserver(RemoteEventEndPoint<T> remoteEndpoint, IObserver<T> observer)
+ {
+ if (remoteEndpoint == null)
+ {
+ throw new ArgumentNullException("remoteEndpoint");
+ }
+
+ SocketRemoteIdentifier id = remoteEndpoint.Id as SocketRemoteIdentifier;
+ if (id == null)
+ {
+ throw new ArgumentException("ID not supported");
+ }
+
+ return RegisterObserver(id.Addr, observer);
+ }
+
+ /// <summary>
+ /// Registers an IObserver used to handle incoming messages from the remote host
+ /// at the specified IPEndPoint.
+ /// The IDisposable that is returned can be used to unregister the IObserver.
+ /// </summary>
+ /// <param name="remoteEndpoint">The IPEndPoint of the remote host</param>
+ /// <param name="observer">The IObserver to handle incoming messages</param>
+ /// <returns>An IDisposable used to unregister the observer with</returns>
+ public IDisposable RegisterObserver(IPEndPoint remoteEndpoint, IObserver<T> observer)
+ {
+ if (remoteEndpoint == null)
+ {
+ throw new ArgumentNullException("remoteEndpoint");
+ }
+ if (observer == null)
+ {
+ throw new ArgumentNullException("observer");
+ }
+
+ return _observerContainer.RegisterObserver(remoteEndpoint, observer);
+ }
+
+ /// <summary>
+ /// Registers an IObserver used to handle incoming messages from the remote host
+ /// at the specified IPEndPoint.
+ /// The IDisposable that is returned can be used to unregister the IObserver.
+ /// </summary>
+ /// <param name="observer">The IObserver to handle incoming messages</param>
+ /// <returns>An IDisposable used to unregister the observer with</returns>
+ public IDisposable RegisterObserver(IObserver<IRemoteMessage<T>> observer)
+ {
+ if (observer == null)
+ {
+ throw new ArgumentNullException("observer");
+ }
+
+ return _observerContainer.RegisterObserver(observer);
+ }
+
+ /// <summary>
+ /// Release all resources for the DefaultRemoteManager.
+ /// </summary>
+ public void Dispose()
+ {
+ foreach (ProxyObserver cachedClient in _cachedClients.Values)
+ {
+ cachedClient.Dispose();
+ }
+
+ if (_server != null)
+ {
+ _server.Dispose();
+ }
+ }
+
+ /// <summary>
+ /// Observer to send messages to connected remote host
+ /// </summary>
+ private class ProxyObserver : IObserver<T>, IDisposable
+ {
+ private TransportClient<IRemoteEvent<T>> _client;
+ private int _messageCount;
+
+ /// <summary>
+ /// Create new ProxyObserver
+ /// </summary>
+ /// <param name="client">The connected transport client used to send
+ /// messages to remote host</param>
+ public ProxyObserver(TransportClient<IRemoteEvent<T>> client)
+ {
+ _client = client;
+ _messageCount = 0;
+ }
+
+ /// <summary>
+ /// Send the message to the remote host
+ /// </summary>
+ /// <param name="message">The message to send</param>
+ public void OnNext(T message)
+ {
+ IRemoteEvent<T> remoteEvent = new RemoteEvent<T>(_client.Link.LocalEndpoint, _client.Link.RemoteEndpoint, message)
+ {
+ Sink = "default",
+ Sequence = _messageCount
+ };
+
+ _messageCount++;
+ _client.Send(remoteEvent);
+ }
+
+ /// <summary>
+ /// Close underlying transport client
+ /// </summary>
+ public void Dispose()
+ {
+ _client.Dispose();
+ }
+
+ public void OnError(Exception error)
+ {
+ throw new NotImplementedException();
+ }
+
+ public void OnCompleted()
+ {
+ throw new NotImplementedException();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/Impl/DefaultRemoteMessage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/DefaultRemoteMessage.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/DefaultRemoteMessage.cs
new file mode 100644
index 0000000..5b24276
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/Impl/DefaultRemoteMessage.cs
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.Reef.Wake.Remote.Impl
+{
+ class DefaultRemoteMessage<T> : IRemoteMessage<T>
+ {
+ public DefaultRemoteMessage(IRemoteIdentifier id, T message)
+ {
+ Identifier = id;
+ Message = message;
+ }
+
+ public IRemoteIdentifier Identifier { get; private set; }
+
+ public T Message { get; private set; }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/Impl/IPEndpointComparer.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/IPEndpointComparer.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/IPEndpointComparer.cs
new file mode 100644
index 0000000..8d4b47d
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/Impl/IPEndpointComparer.cs
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Net;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Org.Apache.Reef.Wake.Remote.Impl
+{
+ /// <summary>
+ /// Class to compare two IPEndPoint objects.
+ /// </summary>
+ internal class IPEndPointComparer : IEqualityComparer<IPEndPoint>
+ {
+ public bool Equals(IPEndPoint x, IPEndPoint y)
+ {
+ if (ReferenceEquals(x, y))
+ {
+ return true;
+ }
+ if (x == null || y == null)
+ {
+ return false;
+ }
+
+ // If either port is 0, don't check port
+ if (x.Port == 0 || y.Port == 0)
+ {
+ return x.Address.Equals(y.Address);
+ }
+
+ return x.Equals(y);
+ }
+
+ public int GetHashCode(IPEndPoint obj)
+ {
+ return obj.Address.GetHashCode();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/Impl/IntCodec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/IntCodec.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/IntCodec.cs
new file mode 100644
index 0000000..e413023
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/Impl/IntCodec.cs
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Tang.Annotations;
+using System;
+
+namespace Org.Apache.Reef.Wake.Remote.Impl
+{
+ public class IntCodec : ICodec<int>
+ {
+ [Inject]
+ public IntCodec()
+ {
+ }
+
+ public byte[] Encode(int obj)
+ {
+ return BitConverter.GetBytes(obj);
+ }
+
+ public int Decode(byte[] data)
+ {
+ return BitConverter.ToInt32(data, 0);
+ }
+ }
+}