You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/06/29 18:25:33 UTC
[2/2] incubator-reef git commit: [REEF-423]: Move Streaming Codec
classes from Network to Wake layer
[REEF-423]: Move Streaming Codec classes from Network to Wake layer
JIRA:
[REEF-423](https://issues.apache.org/jira/browse/REEF-423)
Pull Request:
This closes #257
Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/e70bb56c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/e70bb56c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/e70bb56c
Branch: refs/heads/master
Commit: e70bb56c20834d96440b541bc5e425dfe53283c7
Parents: 6eac41a
Author: Dhruv <dh...@gmail.com>
Authored: Sun Jun 28 15:39:39 2015 -0700
Committer: Markus Weimer <we...@apache.org>
Committed: Mon Jun 29 09:24:31 2015 -0700
----------------------------------------------------------------------
.../BroadcastReduceDriver.cs | 2 +-
.../PipelinedBroadcastReduceDriver.cs | 2 +-
.../ScatterReduceDriver.cs | 2 +-
.../GroupCommunicationTests.cs | 4 +-
.../GroupCommunicationTreeTopologyTests.cs | 2 +-
.../GroupCommunication/StreamingCodecTests.cs | 4 +-
.../CodecToStreamingCodecConfiguration.cs | 2 +-
.../Group/Config/StreamingCodecConfiguration.cs | 2 +-
.../Driver/Impl/GroupCommunicationMessage.cs | 2 +-
.../Pipelining/StreamingPipelineMessageCodec.cs | 2 +-
.../Group/Task/Impl/OperatorTopology.cs | 2 +-
.../Org.Apache.REEF.Network.csproj | 9 --
.../StreamingCodec/CodecToStreamingCodec.cs | 71 ------------
.../DoubleArrayStreamingCodec.cs | 109 -------------------
.../DoubleStreamingCodec.cs | 82 --------------
.../FloatArrayStreamingCodec.cs | 109 -------------------
.../FloatStreamingCodec.cs | 82 --------------
.../IntArrayStreamingCodec.cs | 109 -------------------
.../CommonStreamingCodecs/IntStreamingCodec.cs | 82 --------------
.../StringStreamingCodec.cs | 82 --------------
.../StreamingCodec/IStreamingCodec.cs | 61 -----------
.../Org.Apache.REEF.Wake.csproj | 9 ++
.../StreamingCodec/CodecToStreamingCodec.cs | 71 ++++++++++++
.../DoubleArrayStreamingCodec.cs | 109 +++++++++++++++++++
.../DoubleStreamingCodec.cs | 82 ++++++++++++++
.../FloatArrayStreamingCodec.cs | 109 +++++++++++++++++++
.../FloatStreamingCodec.cs | 82 ++++++++++++++
.../IntArrayStreamingCodec.cs | 109 +++++++++++++++++++
.../CommonStreamingCodecs/IntStreamingCodec.cs | 82 ++++++++++++++
.../StringStreamingCodec.cs | 82 ++++++++++++++
.../StreamingCodec/IStreamingCodec.cs | 61 +++++++++++
31 files changed, 809 insertions(+), 809 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e70bb56c/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/BroadcastReduceDriver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/BroadcastReduceDriver.cs b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/BroadcastReduceDriver.cs
index 85ae3c0..051b23d 100644
--- a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/BroadcastReduceDriver.cs
+++ b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/BroadcastReduceDriver.cs
@@ -27,7 +27,7 @@ using Org.Apache.REEF.Driver;
using Org.Apache.REEF.Driver.Bridge;
using Org.Apache.REEF.Driver.Context;
using Org.Apache.REEF.Driver.Evaluator;
-using Org.Apache.REEF.Network.StreamingCodec.CommonStreamingCodecs;
+using Org.Apache.REEF.Wake.StreamingCodec.CommonStreamingCodecs;
using Org.Apache.REEF.Network.Group.Config;
using Org.Apache.REEF.Network.Group.Driver;
using Org.Apache.REEF.Network.Group.Driver.Impl;
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e70bb56c/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedBroadcastReduceDriver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedBroadcastReduceDriver.cs b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedBroadcastReduceDriver.cs
index f32de61..6675fb5 100644
--- a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedBroadcastReduceDriver.cs
+++ b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedBroadcastReduceDriver.cs
@@ -27,7 +27,6 @@ using Org.Apache.REEF.Driver;
using Org.Apache.REEF.Driver.Bridge;
using Org.Apache.REEF.Driver.Context;
using Org.Apache.REEF.Driver.Evaluator;
-using Org.Apache.REEF.Network.StreamingCodec.CommonStreamingCodecs;
using Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDriverAndTasks;
using Org.Apache.REEF.Network.Group.Config;
using Org.Apache.REEF.Network.Group.Driver;
@@ -44,6 +43,7 @@ using Org.Apache.REEF.Tang.Util;
using Org.Apache.REEF.Utilities.Logging;
using Org.Apache.REEF.Wake.Remote;
using Org.Apache.REEF.Wake.Remote.Parameters;
+using Org.Apache.REEF.Wake.StreamingCodec.CommonStreamingCodecs;
namespace Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastReduceDriverAndTasks
{
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e70bb56c/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/ScatterReduceDriver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/ScatterReduceDriver.cs b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/ScatterReduceDriver.cs
index 8a9d3e2..10595fd 100644
--- a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/ScatterReduceDriver.cs
+++ b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/ScatterReduceDriver.cs
@@ -27,7 +27,6 @@ using Org.Apache.REEF.Driver;
using Org.Apache.REEF.Driver.Bridge;
using Org.Apache.REEF.Driver.Context;
using Org.Apache.REEF.Driver.Evaluator;
-using Org.Apache.REEF.Network.StreamingCodec.CommonStreamingCodecs;
using Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDriverAndTasks;
using Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastReduceDriverAndTasks;
using Org.Apache.REEF.Network.Group.Config;
@@ -44,6 +43,7 @@ using Org.Apache.REEF.Tang.Interface;
using Org.Apache.REEF.Tang.Util;
using Org.Apache.REEF.Utilities.Logging;
using Org.Apache.REEF.Wake.Remote.Impl;
+using Org.Apache.REEF.Wake.StreamingCodec.CommonStreamingCodecs;
namespace Org.Apache.REEF.Network.Examples.GroupCommunication.ScatterReduceDriverAndTasks
{
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e70bb56c/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs
index f2bec7c..61813af 100644
--- a/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs
+++ b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs
@@ -30,7 +30,6 @@ using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Org.Apache.REEF.Common.Io;
using Org.Apache.REEF.Common.Tasks;
-using Org.Apache.REEF.Network.StreamingCodec.CommonStreamingCodecs;
using Org.Apache.REEF.Network.Examples.GroupCommunication;
using Org.Apache.REEF.Network.Group.Config;
using Org.Apache.REEF.Network.Group.Driver;
@@ -43,7 +42,6 @@ using Org.Apache.REEF.Network.Group.Task;
using Org.Apache.REEF.Network.Group.Topology;
using Org.Apache.REEF.Network.Naming;
using Org.Apache.REEF.Network.NetworkService;
-using Org.Apache.REEF.Network.StreamingCodec;
using Org.Apache.REEF.Network.Tests.NamingService;
using Org.Apache.REEF.Tang.Annotations;
using Org.Apache.REEF.Tang.Formats;
@@ -53,6 +51,8 @@ using Org.Apache.REEF.Tang.Interface;
using Org.Apache.REEF.Tang.Util;
using Org.Apache.REEF.Wake.Remote;
using Org.Apache.REEF.Wake.Remote.Impl;
+using Org.Apache.REEF.Wake.StreamingCodec;
+using Org.Apache.REEF.Wake.StreamingCodec.CommonStreamingCodecs;
namespace Org.Apache.REEF.Network.Tests.GroupCommunication
{
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e70bb56c/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTreeTopologyTests.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTreeTopologyTests.cs b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTreeTopologyTests.cs
index 88ae9b3..6b9b8c7 100644
--- a/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTreeTopologyTests.cs
+++ b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTreeTopologyTests.cs
@@ -21,7 +21,6 @@ using System.Collections.Generic;
using System.Globalization;
using System.Linq;
using Microsoft.VisualStudio.TestTools.UnitTesting;
-using Org.Apache.REEF.Network.StreamingCodec.CommonStreamingCodecs;
using Org.Apache.REEF.Network.Group.Config;
using Org.Apache.REEF.Network.Group.Driver;
using Org.Apache.REEF.Network.Group.Driver.Impl;
@@ -34,6 +33,7 @@ using Org.Apache.REEF.Tang.Implementations.Tang;
using Org.Apache.REEF.Tang.Interface;
using Org.Apache.REEF.Tang.Util;
using Org.Apache.REEF.Wake.Remote.Impl;
+using Org.Apache.REEF.Wake.StreamingCodec.CommonStreamingCodecs;
namespace Org.Apache.REEF.Network.Tests.GroupCommunication
{
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e70bb56c/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/StreamingCodecTests.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/StreamingCodecTests.cs b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/StreamingCodecTests.cs
index d4f0647..1757c6c 100644
--- a/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/StreamingCodecTests.cs
+++ b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/StreamingCodecTests.cs
@@ -22,13 +22,13 @@ using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
-using Org.Apache.REEF.Network.StreamingCodec;
-using Org.Apache.REEF.Network.StreamingCodec.CommonStreamingCodecs;
using Org.Apache.REEF.Tang.Implementations.Tang;
using Org.Apache.REEF.Tang.Interface;
using Org.Apache.REEF.Tang.Util;
using Org.Apache.REEF.Wake.Remote;
using Org.Apache.REEF.Wake.Remote.Impl;
+using Org.Apache.REEF.Wake.StreamingCodec;
+using Org.Apache.REEF.Wake.StreamingCodec.CommonStreamingCodecs;
namespace Org.Apache.REEF.Network.Tests.GroupCommunication
{
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e70bb56c/lang/cs/Org.Apache.REEF.Network/Group/Config/CodecToStreamingCodecConfiguration.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Config/CodecToStreamingCodecConfiguration.cs b/lang/cs/Org.Apache.REEF.Network/Group/Config/CodecToStreamingCodecConfiguration.cs
index a270272..f8e1483 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Config/CodecToStreamingCodecConfiguration.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Config/CodecToStreamingCodecConfiguration.cs
@@ -18,10 +18,10 @@
*/
using Org.Apache.REEF.Network.Group.Pipelining;
-using Org.Apache.REEF.Network.StreamingCodec;
using Org.Apache.REEF.Tang.Formats;
using Org.Apache.REEF.Tang.Util;
using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Wake.StreamingCodec;
namespace Org.Apache.REEF.Network.Group.Config
{
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e70bb56c/lang/cs/Org.Apache.REEF.Network/Group/Config/StreamingCodecConfiguration.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Config/StreamingCodecConfiguration.cs b/lang/cs/Org.Apache.REEF.Network/Group/Config/StreamingCodecConfiguration.cs
index f655b91..2a30047 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Config/StreamingCodecConfiguration.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Config/StreamingCodecConfiguration.cs
@@ -18,9 +18,9 @@
*/
using Org.Apache.REEF.Network.Group.Pipelining;
-using Org.Apache.REEF.Network.StreamingCodec;
using Org.Apache.REEF.Tang.Formats;
using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Wake.StreamingCodec;
namespace Org.Apache.REEF.Network.Group.Config
{
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e70bb56c/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessage.cs b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessage.cs
index 33f9c92..ed7855b 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessage.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessage.cs
@@ -19,9 +19,9 @@
using System;
using System.Threading;
-using Org.Apache.REEF.Network.StreamingCodec;
using Org.Apache.REEF.Wake.Remote;
using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Wake.StreamingCodec;
namespace Org.Apache.REEF.Network.Group.Driver.Impl
{
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e70bb56c/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/StreamingPipelineMessageCodec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/StreamingPipelineMessageCodec.cs b/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/StreamingPipelineMessageCodec.cs
index aff8558..9236e95 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/StreamingPipelineMessageCodec.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/StreamingPipelineMessageCodec.cs
@@ -21,7 +21,7 @@ using Org.Apache.REEF.Tang.Annotations;
using Org.Apache.REEF.Wake.Remote;
using System.Threading;
using System.Threading.Tasks;
-using Org.Apache.REEF.Network.StreamingCodec;
+using Org.Apache.REEF.Wake.StreamingCodec;
namespace Org.Apache.REEF.Network.Group.Pipelining
{
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e70bb56c/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs
index 67e1da9..6ecf7f3 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs
@@ -29,10 +29,10 @@ using Org.Apache.REEF.Network.Group.Driver.Impl;
using Org.Apache.REEF.Network.Group.Operators;
using Org.Apache.REEF.Network.Group.Operators.Impl;
using Org.Apache.REEF.Network.NetworkService;
-using Org.Apache.REEF.Network.StreamingCodec;
using Org.Apache.REEF.Tang.Annotations;
using Org.Apache.REEF.Tang.Exceptions;
using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Wake.StreamingCodec;
namespace Org.Apache.REEF.Network.Group.Task.Impl
{
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e70bb56c/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj b/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
index 390bbb4..7b167b4 100644
--- a/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
+++ b/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
@@ -52,13 +52,6 @@ under the License.
<ItemGroup>
<Compile Include="Group\Config\CodecToStreamingCodecConfiguration.cs" />
<Compile Include="Group\Driver\Impl\GeneralGroupCommunicationMessage.cs" />
- <Compile Include="StreamingCodec\CommonStreamingCodecs\FloatArrayStreamingCodec.cs" />
- <Compile Include="StreamingCodec\CommonStreamingCodecs\DoubleArrayStreamingCodec.cs" />
- <Compile Include="StreamingCodec\CommonStreamingCodecs\FloatStreamingCodec.cs" />
- <Compile Include="StreamingCodec\CommonStreamingCodecs\IntArrayStreamingCodec.cs" />
- <Compile Include="StreamingCodec\CommonStreamingCodecs\DoubleStreamingCodec.cs" />
- <Compile Include="StreamingCodec\CommonStreamingCodecs\StringStreamingCodec.cs" />
- <Compile Include="StreamingCodec\CommonStreamingCodecs\IntStreamingCodec.cs" />
<Compile Include="Group\Config\CodecConfiguration.cs" />
<Compile Include="Group\Config\GroupCommConfigurationOptions.cs" />
<Compile Include="Group\Config\PipelineDataConverterConfiguration.cs" />
@@ -156,8 +149,6 @@ under the License.
<Compile Include="NetworkService\WritableNsConnection.cs" />
<Compile Include="NetworkService\WritableNsMessage.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
- <Compile Include="StreamingCodec\CodecToStreamingCodec.cs" />
- <Compile Include="StreamingCodec\IStreamingCodec.cs" />
<Compile Include="Utilities\BlockingCollectionExtensions.cs" />
<Compile Include="Utilities\Utils.cs" />
</ItemGroup>
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e70bb56c/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CodecToStreamingCodec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CodecToStreamingCodec.cs b/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CodecToStreamingCodec.cs
deleted file mode 100644
index 1964bb1..0000000
--- a/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CodecToStreamingCodec.cs
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System.Threading;
-using System.Threading.Tasks;
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Wake.Remote;
-
-namespace Org.Apache.REEF.Network.StreamingCodec
-{
- /// <summary>
- /// Converts codec to streaming codec
- /// </summary>
- /// <typeparam name="T">Message type</typeparam>
- public sealed class CodecToStreamingCodec<T> : IStreamingCodec<T>
- {
- private readonly ICodec<T> _codec;
-
- [Inject]
- private CodecToStreamingCodec(ICodec<T> codec)
- {
- _codec = codec;
- }
-
- public T Read(IDataReader reader)
- {
- int length = reader.ReadInt32();
- byte[] byteArr = new byte[length];
- reader.Read(ref byteArr, 0, length);
- return _codec.Decode(byteArr);
- }
-
- public void Write(T obj, IDataWriter writer)
- {
- var byteArr = _codec.Encode(obj);
- writer.WriteInt32(byteArr.Length);
- writer.Write(byteArr, 0, byteArr.Length);
- }
-
- public async Task<T> ReadAsync(IDataReader reader, CancellationToken token)
- {
- int length = await reader.ReadInt32Async(token);
- byte[] byteArr = new byte[length];
- await reader.ReadAsync(byteArr, 0, length, token);
- return _codec.Decode(byteArr);
- }
-
- public async Task WriteAsync(T obj, IDataWriter writer, CancellationToken token)
- {
- var byteArr = _codec.Encode(obj);
- await writer.WriteInt32Async(byteArr.Length, token);
- await writer.WriteAsync(byteArr, 0, byteArr.Length, token);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e70bb56c/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/DoubleArrayStreamingCodec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/DoubleArrayStreamingCodec.cs b/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/DoubleArrayStreamingCodec.cs
deleted file mode 100644
index 5caaaee..0000000
--- a/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/DoubleArrayStreamingCodec.cs
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Threading;
-using System.Threading.Tasks;
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Wake.Remote;
-
-namespace Org.Apache.REEF.Network.StreamingCodec.CommonStreamingCodecs
-{
- /// <summary>
- /// Streaming codec for double array
- /// </summary>
- public sealed class DoubleArrayStreamingCodec : IStreamingCodec<double[]>
- {
- /// <summary>
- /// Injectable constructor
- /// </summary>
- [Inject]
- private DoubleArrayStreamingCodec()
- {
- }
-
- /// <summary>
- /// Instantiate the class from the reader.
- /// </summary>
- /// <param name="reader">The reader from which to read</param>
- ///<returns>The double array read from the reader</returns>
- public double[] Read(IDataReader reader)
- {
- int length = reader.ReadInt32();
- byte[] buffer = new byte[sizeof(double)*length];
- reader.Read(ref buffer, 0, buffer.Length);
- double[] doubleArr = new double[length];
- Buffer.BlockCopy(buffer, 0, doubleArr, 0, buffer.Length);
- return doubleArr;
- }
-
- /// <summary>
- /// Writes the double array to the writer.
- /// </summary>
- /// <param name="obj">The double array to be encoded</param>
- /// <param name="writer">The writer to which to write</param>
- public void Write(double[] obj, IDataWriter writer)
- {
- if (obj == null)
- {
- throw new ArgumentNullException("obj", "double array is null");
- }
-
- writer.WriteInt32(obj.Length);
- byte[] buffer = new byte[sizeof(double) * obj.Length];
- Buffer.BlockCopy(obj, 0, buffer, 0, buffer.Length);
- writer.Write(buffer, 0, buffer.Length);
- }
-
- /// <summary>
- /// Instantiate the class from the reader.
- /// </summary>
- /// <param name="reader">The reader from which to read</param>
- /// <param name="token">Cancellation token</param>
- /// <returns>The double array read from the reader</returns>
- public async Task<double[]> ReadAsync(IDataReader reader, CancellationToken token)
- {
- int length = await reader.ReadInt32Async(token);
- byte[] buffer = new byte[sizeof(double) * length];
- await reader.ReadAsync(buffer, 0, buffer.Length, token);
- double[] doubleArr = new double[length];
- Buffer.BlockCopy(buffer, 0, doubleArr, 0, sizeof(double) * length);
- return doubleArr;
- }
-
- /// <summary>
- /// Writes the double array to the writer.
- /// </summary>
- /// <param name="obj">The double array to be encoded</param>
- /// <param name="writer">The writer to which to write</param>
- /// <param name="token">Cancellation token</param>
- public async Task WriteAsync(double[] obj, IDataWriter writer, CancellationToken token)
- {
- if (obj == null)
- {
- throw new ArgumentNullException("obj", "double array is null");
- }
-
- await writer.WriteInt32Async(obj.Length, token);
- byte[] buffer = new byte[sizeof(double) * obj.Length];
- Buffer.BlockCopy(obj, 0, buffer, 0, sizeof(double) * obj.Length);
- await writer.WriteAsync(buffer, 0, buffer.Length, token);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e70bb56c/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/DoubleStreamingCodec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/DoubleStreamingCodec.cs b/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/DoubleStreamingCodec.cs
deleted file mode 100644
index 5f19f00..0000000
--- a/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/DoubleStreamingCodec.cs
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System.Threading;
-using System.Threading.Tasks;
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Wake.Remote;
-
-namespace Org.Apache.REEF.Network.StreamingCodec.CommonStreamingCodecs
-{
- /// <summary>
- /// Streaming codec for double
- /// </summary>
- public sealed class DoubleStreamingCodec : IStreamingCodec<double>
- {
- /// <summary>
- /// Injectable constructor
- /// </summary>
- [Inject]
- private DoubleStreamingCodec()
- {
- }
-
- /// <summary>
- /// Instantiate the class from the reader.
- /// </summary>
- /// <param name="reader">The reader from which to read</param>
- ///<returns>The double read from the reader</returns>
- public double Read(IDataReader reader)
- {
- return reader.ReadDouble();
- }
-
- /// <summary>
- /// Writes the double to the writer.
- /// </summary>
- /// <param name="obj">The double to be encoded</param>
- /// <param name="writer">The writer to which to write</param>
- public void Write(double obj, IDataWriter writer)
- {
- writer.WriteDouble(obj);
- }
-
- /// <summary>
- /// Instantiate the class from the reader.
- /// </summary>
- /// <param name="reader">The reader from which to read</param>
- /// <param name="token">Cancellation token</param>
- /// <returns>The double read from the reader</returns>
- public async Task<double> ReadAsync(IDataReader reader, CancellationToken token)
- {
- return await reader.ReadDoubleAsync(token);
- }
-
- /// <summary>
- /// Writes the double to the writer.
- /// </summary>
- /// <param name="obj">The double to be encoded</param>
- /// <param name="writer">The writer to which to write</param>
- /// <param name="token">Cancellation token</param>
- public async Task WriteAsync(double obj, IDataWriter writer, CancellationToken token)
- {
- await writer.WriteDoubleAsync(obj, token);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e70bb56c/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/FloatArrayStreamingCodec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/FloatArrayStreamingCodec.cs b/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/FloatArrayStreamingCodec.cs
deleted file mode 100644
index 8d68749..0000000
--- a/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/FloatArrayStreamingCodec.cs
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Threading;
-using System.Threading.Tasks;
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Wake.Remote;
-
-namespace Org.Apache.REEF.Network.StreamingCodec.CommonStreamingCodecs
-{
- /// <summary>
- /// Streaming codec for float array
- /// </summary>
- public sealed class FloatArrayStreamingCodec : IStreamingCodec<float[]>
- {
- /// <summary>
- /// Injectable constructor
- /// </summary>
- [Inject]
- private FloatArrayStreamingCodec()
- {
- }
-
- /// <summary>
- /// Instantiate the class from the reader.
- /// </summary>
- /// <param name="reader">The reader from which to read</param>
- ///<returns>The float array read from the reader</returns>
- public float[] Read(IDataReader reader)
- {
- int length = reader.ReadInt32();
- byte[] buffer = new byte[sizeof(float)*length];
- reader.Read(ref buffer, 0, buffer.Length);
- float[] floatArr = new float[length];
- Buffer.BlockCopy(buffer, 0, floatArr, 0, buffer.Length);
- return floatArr;
- }
-
- /// <summary>
- /// Writes the float array to the writer.
- /// </summary>
- /// <param name="obj">The float array to be encoded</param>
- /// <param name="writer">The writer to which to write</param>
- public void Write(float[] obj, IDataWriter writer)
- {
- if (obj == null)
- {
- throw new ArgumentNullException("obj", "float array is null");
- }
-
- writer.WriteInt32(obj.Length);
- byte[] buffer = new byte[sizeof(float) * obj.Length];
- Buffer.BlockCopy(obj, 0, buffer, 0, buffer.Length);
- writer.Write(buffer, 0, buffer.Length);
- }
-
- /// <summary>
- /// Instantiate the class from the reader.
- /// </summary>
- /// <param name="reader">The reader from which to read</param>
- /// <param name="token">Cancellation token</param>
- /// <returns>The float array read from the reader</returns>
- public async Task<float[]> ReadAsync(IDataReader reader, CancellationToken token)
- {
- int length = await reader.ReadInt32Async(token);
- byte[] buffer = new byte[sizeof(float) * length];
- await reader.ReadAsync(buffer, 0, buffer.Length, token);
- float[] floatArr = new float[length];
- Buffer.BlockCopy(buffer, 0, floatArr, 0, sizeof(float) * length);
- return floatArr;
- }
-
- /// <summary>
- /// Writes the float array to the writer.
- /// </summary>
- /// <param name="obj">The float array to be encoded</param>
- /// <param name="writer">The writer to which to write</param>
- /// <param name="token">Cancellation token</param>
- public async Task WriteAsync(float[] obj, IDataWriter writer, CancellationToken token)
- {
- if (obj == null)
- {
- throw new ArgumentNullException("obj", "float array is null");
- }
-
- await writer.WriteInt32Async(obj.Length, token);
- byte[] buffer = new byte[sizeof(float) * obj.Length];
- Buffer.BlockCopy(obj, 0, buffer, 0, sizeof(float) * obj.Length);
- await writer.WriteAsync(buffer, 0, buffer.Length, token);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e70bb56c/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/FloatStreamingCodec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/FloatStreamingCodec.cs b/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/FloatStreamingCodec.cs
deleted file mode 100644
index 22ed947..0000000
--- a/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/FloatStreamingCodec.cs
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System.Threading;
-using System.Threading.Tasks;
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Wake.Remote;
-
-namespace Org.Apache.REEF.Network.StreamingCodec.CommonStreamingCodecs
-{
- /// <summary>
- /// Streaming codec for float
- /// </summary>
- public sealed class FloatStreamingCodec : IStreamingCodec<float>
- {
- /// <summary>
- /// Injectable constructor
- /// </summary>
- [Inject]
- private FloatStreamingCodec()
- {
- }
-
- /// <summary>
- /// Instantiate the class from the reader.
- /// </summary>
- /// <param name="reader">The reader from which to read</param>
- ///<returns>The float read from the reader</returns>
- public float Read(IDataReader reader)
- {
- return reader.ReadFloat();
- }
-
- /// <summary>
- /// Writes the float to the writer.
- /// </summary>
- /// <param name="obj">The float to be encoded</param>
- /// <param name="writer">The writer to which to write</param>
- public void Write(float obj, IDataWriter writer)
- {
- writer.WriteFloat(obj);
- }
-
- /// <summary>
- /// Instantiate the class from the reader.
- /// </summary>
- /// <param name="reader">The reader from which to read</param>
- /// <param name="token">Cancellation token</param>
- /// <returns>The float read from the reader</returns>
- public async Task<float> ReadAsync(IDataReader reader, CancellationToken token)
- {
- return await reader.ReadFloatAsync(token);
- }
-
- /// <summary>
- /// Writes the float to the writer.
- /// </summary>
- /// <param name="obj">The float to be encoded</param>
- /// <param name="writer">The writer to which to write</param>
- /// <param name="token">Cancellation token</param>
- public async Task WriteAsync(float obj, IDataWriter writer, CancellationToken token)
- {
- await writer.WriteFloatAsync(obj, token);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e70bb56c/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/IntArrayStreamingCodec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/IntArrayStreamingCodec.cs b/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/IntArrayStreamingCodec.cs
deleted file mode 100644
index 090d99f..0000000
--- a/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/IntArrayStreamingCodec.cs
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Threading;
-using System.Threading.Tasks;
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Wake.Remote;
-
-namespace Org.Apache.REEF.Network.StreamingCodec.CommonStreamingCodecs
-{
- /// <summary>
- /// Streaming codec for integer array
- /// </summary>
- public sealed class IntArrayStreamingCodec : IStreamingCodec<int[]>
- {
- /// <summary>
- /// Injectable constructor
- /// </summary>
- [Inject]
- private IntArrayStreamingCodec()
- {
- }
-
- /// <summary>
- /// Instantiate the class from the reader.
- /// </summary>
- /// <param name="reader">The reader from which to read</param>
- ///<returns>The integer array read from the reader</returns>
- public int[] Read(IDataReader reader)
- {
- int length = reader.ReadInt32();
- byte[] buffer = new byte[sizeof(int)*length];
- reader.Read(ref buffer, 0, buffer.Length);
- int[] intArr = new int[length];
- Buffer.BlockCopy(buffer, 0, intArr, 0, buffer.Length);
- return intArr;
- }
-
- /// <summary>
- /// Writes the integer array to the writer.
- /// </summary>
- /// <param name="obj">The integer array to be encoded</param>
- /// <param name="writer">The writer to which to write</param>
- public void Write(int[] obj, IDataWriter writer)
- {
- if (obj == null)
- {
- throw new ArgumentNullException("obj", "integer array is null");
- }
-
- writer.WriteInt32(obj.Length);
- byte[] buffer = new byte[sizeof(int) * obj.Length];
- Buffer.BlockCopy(obj, 0, buffer, 0, buffer.Length);
- writer.Write(buffer, 0, buffer.Length);
- }
-
- /// <summary>
- /// Instantiate the class from the reader.
- /// </summary>
- /// <param name="reader">The reader from which to read</param>
- /// <param name="token">Cancellation token</param>
- /// <returns>The integer array read from the reader</returns>
- public async Task<int[]> ReadAsync(IDataReader reader, CancellationToken token)
- {
- int length = await reader.ReadInt32Async(token);
- byte[] buffer = new byte[sizeof(int) * length];
- await reader.ReadAsync(buffer, 0, buffer.Length, token);
- int[] intArr = new int[length];
- Buffer.BlockCopy(buffer, 0, intArr, 0, sizeof(int) * length);
- return intArr;
- }
-
- /// <summary>
- /// Writes the integer array to the writer.
- /// </summary>
- /// <param name="obj">The integer array to be encoded</param>
- /// <param name="writer">The writer to which to write</param>
- /// <param name="token">Cancellation token</param>
- public async Task WriteAsync(int[] obj, IDataWriter writer, CancellationToken token)
- {
- if (obj == null)
- {
- throw new ArgumentNullException("obj", "integer array is null");
- }
-
- await writer.WriteInt32Async(obj.Length, token);
- byte[] buffer = new byte[sizeof(int) * obj.Length];
- Buffer.BlockCopy(obj, 0, buffer, 0, sizeof(int) * obj.Length);
- await writer.WriteAsync(buffer, 0, buffer.Length, token);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e70bb56c/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/IntStreamingCodec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/IntStreamingCodec.cs b/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/IntStreamingCodec.cs
deleted file mode 100644
index 22e5490..0000000
--- a/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/IntStreamingCodec.cs
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System.Threading;
-using System.Threading.Tasks;
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Wake.Remote;
-
-namespace Org.Apache.REEF.Network.StreamingCodec.CommonStreamingCodecs
-{
- /// <summary>
- /// Streaming codec for integer
- /// </summary>
- public sealed class IntStreamingCodec : IStreamingCodec<int>
- {
- /// <summary>
- /// Injectable constructor
- /// </summary>
- [Inject]
- private IntStreamingCodec()
- {
- }
-
- /// <summary>
- /// Instantiate the class from the reader.
- /// </summary>
- /// <param name="reader">The reader from which to read</param>
- ///<returns>The iinteger read from the reader</returns>
- public int Read(IDataReader reader)
- {
- return reader.ReadInt32();
- }
-
- /// <summary>
- /// Writes the integer to the writer.
- /// </summary>
- /// <param name="obj">The integer to be encoded</param>
- /// <param name="writer">The writer to which to write</param>
- public void Write(int obj, IDataWriter writer)
- {
- writer.WriteInt32(obj);
- }
-
- /// <summary>
- /// Instantiate the class from the reader.
- /// </summary>
- /// <param name="reader">The reader from which to read</param>
- /// <param name="token">Cancellation token</param>
- /// <returns>The integer read from the reader</returns>
- public async Task<int> ReadAsync(IDataReader reader, CancellationToken token)
- {
- return await reader.ReadInt32Async(token);
- }
-
- /// <summary>
- /// Writes the integer to the writer.
- /// </summary>
- /// <param name="obj">The integer to be encoded</param>
- /// <param name="writer">The writer to which to write</param>
- /// <param name="token">Cancellation token</param>
- public async Task WriteAsync(int obj, IDataWriter writer, CancellationToken token)
- {
- await writer.WriteInt32Async(obj, token);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e70bb56c/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/StringStreamingCodec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/StringStreamingCodec.cs b/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/StringStreamingCodec.cs
deleted file mode 100644
index 63036f5..0000000
--- a/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/StringStreamingCodec.cs
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System.Threading;
-using System.Threading.Tasks;
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Wake.Remote;
-
-namespace Org.Apache.REEF.Network.StreamingCodec.CommonStreamingCodecs
-{
- /// <summary>
- /// Streaming codec for string
- /// </summary>
- public sealed class StringStreamingCodec : IStreamingCodec<string>
- {
- /// <summary>
- /// Injectable constructor
- /// </summary>
- [Inject]
- private StringStreamingCodec()
- {
- }
-
- /// <summary>
- /// Instantiate the class from the reader.
- /// </summary>
- /// <param name="reader">The reader from which to read</param>
- ///<returns>The string read from the reader</returns>
- public string Read(IDataReader reader)
- {
- return reader.ReadString();
- }
-
- /// <summary>
- /// Writes the string to the writer.
- /// </summary>
- /// <param name="obj">The string to be encoded</param>
- /// <param name="writer">The writer to which to write</param>
- public void Write(string obj, IDataWriter writer)
- {
- writer.WriteString(obj);
- }
-
- /// <summary>
- /// Instantiate the class from the reader.
- /// </summary>
- /// <param name="reader">The reader from which to read</param>
- /// <param name="token">Cancellation token</param>
- /// <returns>The string read from the reader</returns>
- public async Task<string> ReadAsync(IDataReader reader, CancellationToken token)
- {
- return await reader.ReadStringAsync(token);
- }
-
- /// <summary>
- /// Writes the string to the writer.
- /// </summary>
- /// <param name="obj">The string to be encoded</param>
- /// <param name="writer">The writer to which to write</param>
- /// <param name="token">Cancellation token</param>
- public async Task WriteAsync(string obj, IDataWriter writer, CancellationToken token)
- {
- await writer.WriteStringAsync(obj, token);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e70bb56c/lang/cs/Org.Apache.REEF.Network/StreamingCodec/IStreamingCodec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/StreamingCodec/IStreamingCodec.cs b/lang/cs/Org.Apache.REEF.Network/StreamingCodec/IStreamingCodec.cs
deleted file mode 100644
index 440b1c0..0000000
--- a/lang/cs/Org.Apache.REEF.Network/StreamingCodec/IStreamingCodec.cs
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System.Threading;
-using System.Threading.Tasks;
-using Org.Apache.REEF.Wake.Remote;
-
-namespace Org.Apache.REEF.Network.StreamingCodec
-{
- /// <summary>
- /// Codec Interface that external users should implement to directly write to the stream
- /// </summary>
- public interface IStreamingCodec<T>
- {
- /// <summary>
- /// Instantiate the class from the reader.
- /// </summary>
- /// <param name="reader">The reader from which to read</param>
- ///<returns>The instance of type T read from the reader</returns>
- T Read(IDataReader reader);
-
- /// <summary>
- /// Writes the class fields to the writer.
- /// </summary>
- /// <param name="obj">The object of type T to be encoded</param>
- /// <param name="writer">The writer to which to write</param>
- void Write(T obj, IDataWriter writer);
-
- /// <summary>
- /// Instantiate the class from the reader.
- /// </summary>
- /// <param name="reader">The reader from which to read</param>
- /// <param name="token">Cancellation token</param>
- /// <returns>The instance of type T read from the reader</returns>
- Task<T> ReadAsync(IDataReader reader, CancellationToken token);
-
- /// <summary>
- /// Writes the class fields to the writer.
- /// </summary>
- /// <param name="obj">The object of type T to be encoded</param>
- /// <param name="writer">The writer to which to write</param>
- /// <param name="token">Cancellation token</param>
- Task WriteAsync(T obj, IDataWriter writer, CancellationToken token);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e70bb56c/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj b/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
index 170c967..d2b2970 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
+++ b/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
@@ -131,6 +131,15 @@ under the License.
<Compile Include="RX\IStaticObservable.cs" />
<Compile Include="RX\ISubject.cs" />
<Compile Include="RX\ObserverCompletedException.cs" />
+ <Compile Include="StreamingCodec\CodecToStreamingCodec.cs" />
+ <Compile Include="StreamingCodec\CommonStreamingCodecs\DoubleArrayStreamingCodec.cs" />
+ <Compile Include="StreamingCodec\CommonStreamingCodecs\DoubleStreamingCodec.cs" />
+ <Compile Include="StreamingCodec\CommonStreamingCodecs\FloatArrayStreamingCodec.cs" />
+ <Compile Include="StreamingCodec\CommonStreamingCodecs\FloatStreamingCodec.cs" />
+ <Compile Include="StreamingCodec\CommonStreamingCodecs\IntArrayStreamingCodec.cs" />
+ <Compile Include="StreamingCodec\CommonStreamingCodecs\IntStreamingCodec.cs" />
+ <Compile Include="StreamingCodec\CommonStreamingCodecs\StringStreamingCodec.cs" />
+ <Compile Include="StreamingCodec\IStreamingCodec.cs" />
<Compile Include="Time\Event\Alarm.cs" />
<Compile Include="Time\Event\StartTime.cs" />
<Compile Include="Time\Event\StopTime.cs" />
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e70bb56c/lang/cs/Org.Apache.REEF.Wake/StreamingCodec/CodecToStreamingCodec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/StreamingCodec/CodecToStreamingCodec.cs b/lang/cs/Org.Apache.REEF.Wake/StreamingCodec/CodecToStreamingCodec.cs
new file mode 100644
index 0000000..61b39ab
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/StreamingCodec/CodecToStreamingCodec.cs
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Threading;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Wake.Remote;
+
+namespace Org.Apache.REEF.Wake.StreamingCodec
+{
+ /// <summary>
+ /// Converts codec to streaming codec
+ /// </summary>
+ /// <typeparam name="T">Message type</typeparam>
+ public sealed class CodecToStreamingCodec<T> : IStreamingCodec<T>
+ {
+ private readonly ICodec<T> _codec;
+
+ [Inject]
+ private CodecToStreamingCodec(ICodec<T> codec)
+ {
+ _codec = codec;
+ }
+
+ public T Read(IDataReader reader)
+ {
+ int length = reader.ReadInt32();
+ byte[] byteArr = new byte[length];
+ reader.Read(ref byteArr, 0, length);
+ return _codec.Decode(byteArr);
+ }
+
+ public void Write(T obj, IDataWriter writer)
+ {
+ var byteArr = _codec.Encode(obj);
+ writer.WriteInt32(byteArr.Length);
+ writer.Write(byteArr, 0, byteArr.Length);
+ }
+
+ public async Task<T> ReadAsync(IDataReader reader, CancellationToken token)
+ {
+ int length = await reader.ReadInt32Async(token);
+ byte[] byteArr = new byte[length];
+ await reader.ReadAsync(byteArr, 0, length, token);
+ return _codec.Decode(byteArr);
+ }
+
+ public async Task WriteAsync(T obj, IDataWriter writer, CancellationToken token)
+ {
+ var byteArr = _codec.Encode(obj);
+ await writer.WriteInt32Async(byteArr.Length, token);
+ await writer.WriteAsync(byteArr, 0, byteArr.Length, token);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e70bb56c/lang/cs/Org.Apache.REEF.Wake/StreamingCodec/CommonStreamingCodecs/DoubleArrayStreamingCodec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/StreamingCodec/CommonStreamingCodecs/DoubleArrayStreamingCodec.cs b/lang/cs/Org.Apache.REEF.Wake/StreamingCodec/CommonStreamingCodecs/DoubleArrayStreamingCodec.cs
new file mode 100644
index 0000000..a4d8654
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/StreamingCodec/CommonStreamingCodecs/DoubleArrayStreamingCodec.cs
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Wake.Remote;
+
+namespace Org.Apache.REEF.Wake.StreamingCodec.CommonStreamingCodecs
+{
+ /// <summary>
+ /// Streaming codec for double array
+ /// </summary>
+ public sealed class DoubleArrayStreamingCodec : IStreamingCodec<double[]>
+ {
+ /// <summary>
+ /// Injectable constructor
+ /// </summary>
+ [Inject]
+ private DoubleArrayStreamingCodec()
+ {
+ }
+
+ /// <summary>
+ /// Instantiate the class from the reader.
+ /// </summary>
+ /// <param name="reader">The reader from which to read</param>
+ ///<returns>The double array read from the reader</returns>
+ public double[] Read(IDataReader reader)
+ {
+ int length = reader.ReadInt32();
+ byte[] buffer = new byte[sizeof(double)*length];
+ reader.Read(ref buffer, 0, buffer.Length);
+ double[] doubleArr = new double[length];
+ Buffer.BlockCopy(buffer, 0, doubleArr, 0, buffer.Length);
+ return doubleArr;
+ }
+
+ /// <summary>
+ /// Writes the double array to the writer.
+ /// </summary>
+ /// <param name="obj">The double array to be encoded</param>
+ /// <param name="writer">The writer to which to write</param>
+ public void Write(double[] obj, IDataWriter writer)
+ {
+ if (obj == null)
+ {
+ throw new ArgumentNullException("obj", "double array is null");
+ }
+
+ writer.WriteInt32(obj.Length);
+ byte[] buffer = new byte[sizeof(double) * obj.Length];
+ Buffer.BlockCopy(obj, 0, buffer, 0, buffer.Length);
+ writer.Write(buffer, 0, buffer.Length);
+ }
+
+ /// <summary>
+ /// Instantiate the class from the reader.
+ /// </summary>
+ /// <param name="reader">The reader from which to read</param>
+ /// <param name="token">Cancellation token</param>
+ /// <returns>The double array read from the reader</returns>
+ public async Task<double[]> ReadAsync(IDataReader reader, CancellationToken token)
+ {
+ int length = await reader.ReadInt32Async(token);
+ byte[] buffer = new byte[sizeof(double) * length];
+ await reader.ReadAsync(buffer, 0, buffer.Length, token);
+ double[] doubleArr = new double[length];
+ Buffer.BlockCopy(buffer, 0, doubleArr, 0, sizeof(double) * length);
+ return doubleArr;
+ }
+
+ /// <summary>
+ /// Writes the double array to the writer.
+ /// </summary>
+ /// <param name="obj">The double array to be encoded</param>
+ /// <param name="writer">The writer to which to write</param>
+ /// <param name="token">Cancellation token</param>
+ public async Task WriteAsync(double[] obj, IDataWriter writer, CancellationToken token)
+ {
+ if (obj == null)
+ {
+ throw new ArgumentNullException("obj", "double array is null");
+ }
+
+ await writer.WriteInt32Async(obj.Length, token);
+ byte[] buffer = new byte[sizeof(double) * obj.Length];
+ Buffer.BlockCopy(obj, 0, buffer, 0, sizeof(double) * obj.Length);
+ await writer.WriteAsync(buffer, 0, buffer.Length, token);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e70bb56c/lang/cs/Org.Apache.REEF.Wake/StreamingCodec/CommonStreamingCodecs/DoubleStreamingCodec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/StreamingCodec/CommonStreamingCodecs/DoubleStreamingCodec.cs b/lang/cs/Org.Apache.REEF.Wake/StreamingCodec/CommonStreamingCodecs/DoubleStreamingCodec.cs
new file mode 100644
index 0000000..6cca28f
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/StreamingCodec/CommonStreamingCodecs/DoubleStreamingCodec.cs
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Threading;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Wake.Remote;
+
+namespace Org.Apache.REEF.Wake.StreamingCodec.CommonStreamingCodecs
+{
+ /// <summary>
+ /// Streaming codec for double
+ /// </summary>
+ public sealed class DoubleStreamingCodec : IStreamingCodec<double>
+ {
+ /// <summary>
+ /// Injectable constructor
+ /// </summary>
+ [Inject]
+ private DoubleStreamingCodec()
+ {
+ }
+
+ /// <summary>
+ /// Instantiate the class from the reader.
+ /// </summary>
+ /// <param name="reader">The reader from which to read</param>
+ ///<returns>The double read from the reader</returns>
+ public double Read(IDataReader reader)
+ {
+ return reader.ReadDouble();
+ }
+
+ /// <summary>
+ /// Writes the double to the writer.
+ /// </summary>
+ /// <param name="obj">The double to be encoded</param>
+ /// <param name="writer">The writer to which to write</param>
+ public void Write(double obj, IDataWriter writer)
+ {
+ writer.WriteDouble(obj);
+ }
+
+ /// <summary>
+ /// Instantiate the class from the reader.
+ /// </summary>
+ /// <param name="reader">The reader from which to read</param>
+ /// <param name="token">Cancellation token</param>
+ /// <returns>The double read from the reader</returns>
+ public async Task<double> ReadAsync(IDataReader reader, CancellationToken token)
+ {
+ return await reader.ReadDoubleAsync(token);
+ }
+
+ /// <summary>
+ /// Writes the double to the writer.
+ /// </summary>
+ /// <param name="obj">The double to be encoded</param>
+ /// <param name="writer">The writer to which to write</param>
+ /// <param name="token">Cancellation token</param>
+ public async Task WriteAsync(double obj, IDataWriter writer, CancellationToken token)
+ {
+ await writer.WriteDoubleAsync(obj, token);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e70bb56c/lang/cs/Org.Apache.REEF.Wake/StreamingCodec/CommonStreamingCodecs/FloatArrayStreamingCodec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/StreamingCodec/CommonStreamingCodecs/FloatArrayStreamingCodec.cs b/lang/cs/Org.Apache.REEF.Wake/StreamingCodec/CommonStreamingCodecs/FloatArrayStreamingCodec.cs
new file mode 100644
index 0000000..72b6140
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/StreamingCodec/CommonStreamingCodecs/FloatArrayStreamingCodec.cs
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Wake.Remote;
+
+namespace Org.Apache.REEF.Wake.StreamingCodec.CommonStreamingCodecs
+{
+ /// <summary>
+ /// Streaming codec for float array
+ /// </summary>
+ public sealed class FloatArrayStreamingCodec : IStreamingCodec<float[]>
+ {
+ /// <summary>
+ /// Injectable constructor
+ /// </summary>
+ [Inject]
+ private FloatArrayStreamingCodec()
+ {
+ }
+
+ /// <summary>
+ /// Instantiate the class from the reader.
+ /// </summary>
+ /// <param name="reader">The reader from which to read</param>
+ ///<returns>The float array read from the reader</returns>
+ public float[] Read(IDataReader reader)
+ {
+ int length = reader.ReadInt32();
+ byte[] buffer = new byte[sizeof(float)*length];
+ reader.Read(ref buffer, 0, buffer.Length);
+ float[] floatArr = new float[length];
+ Buffer.BlockCopy(buffer, 0, floatArr, 0, buffer.Length);
+ return floatArr;
+ }
+
+ /// <summary>
+ /// Writes the float array to the writer.
+ /// </summary>
+ /// <param name="obj">The float array to be encoded</param>
+ /// <param name="writer">The writer to which to write</param>
+ public void Write(float[] obj, IDataWriter writer)
+ {
+ if (obj == null)
+ {
+ throw new ArgumentNullException("obj", "float array is null");
+ }
+
+ writer.WriteInt32(obj.Length);
+ byte[] buffer = new byte[sizeof(float) * obj.Length];
+ Buffer.BlockCopy(obj, 0, buffer, 0, buffer.Length);
+ writer.Write(buffer, 0, buffer.Length);
+ }
+
+ /// <summary>
+ /// Instantiate the class from the reader.
+ /// </summary>
+ /// <param name="reader">The reader from which to read</param>
+ /// <param name="token">Cancellation token</param>
+ /// <returns>The float array read from the reader</returns>
+ public async Task<float[]> ReadAsync(IDataReader reader, CancellationToken token)
+ {
+ int length = await reader.ReadInt32Async(token);
+ byte[] buffer = new byte[sizeof(float) * length];
+ await reader.ReadAsync(buffer, 0, buffer.Length, token);
+ float[] floatArr = new float[length];
+ Buffer.BlockCopy(buffer, 0, floatArr, 0, sizeof(float) * length);
+ return floatArr;
+ }
+
+ /// <summary>
+ /// Writes the float array to the writer.
+ /// </summary>
+ /// <param name="obj">The float array to be encoded</param>
+ /// <param name="writer">The writer to which to write</param>
+ /// <param name="token">Cancellation token</param>
+ public async Task WriteAsync(float[] obj, IDataWriter writer, CancellationToken token)
+ {
+ if (obj == null)
+ {
+ throw new ArgumentNullException("obj", "float array is null");
+ }
+
+ await writer.WriteInt32Async(obj.Length, token);
+ byte[] buffer = new byte[sizeof(float) * obj.Length];
+ Buffer.BlockCopy(obj, 0, buffer, 0, sizeof(float) * obj.Length);
+ await writer.WriteAsync(buffer, 0, buffer.Length, token);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e70bb56c/lang/cs/Org.Apache.REEF.Wake/StreamingCodec/CommonStreamingCodecs/FloatStreamingCodec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/StreamingCodec/CommonStreamingCodecs/FloatStreamingCodec.cs b/lang/cs/Org.Apache.REEF.Wake/StreamingCodec/CommonStreamingCodecs/FloatStreamingCodec.cs
new file mode 100644
index 0000000..7bc6215
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/StreamingCodec/CommonStreamingCodecs/FloatStreamingCodec.cs
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Threading;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Wake.Remote;
+
+namespace Org.Apache.REEF.Wake.StreamingCodec.CommonStreamingCodecs
+{
+ /// <summary>
+ /// Streaming codec for float
+ /// </summary>
+ public sealed class FloatStreamingCodec : IStreamingCodec<float>
+ {
+ /// <summary>
+ /// Injectable constructor
+ /// </summary>
+ [Inject]
+ private FloatStreamingCodec()
+ {
+ }
+
+ /// <summary>
+ /// Instantiate the class from the reader.
+ /// </summary>
+ /// <param name="reader">The reader from which to read</param>
+ ///<returns>The float read from the reader</returns>
+ public float Read(IDataReader reader)
+ {
+ return reader.ReadFloat();
+ }
+
+ /// <summary>
+ /// Writes the float to the writer.
+ /// </summary>
+ /// <param name="obj">The float to be encoded</param>
+ /// <param name="writer">The writer to which to write</param>
+ public void Write(float obj, IDataWriter writer)
+ {
+ writer.WriteFloat(obj);
+ }
+
+ /// <summary>
+ /// Instantiate the class from the reader.
+ /// </summary>
+ /// <param name="reader">The reader from which to read</param>
+ /// <param name="token">Cancellation token</param>
+ /// <returns>The float read from the reader</returns>
+ public async Task<float> ReadAsync(IDataReader reader, CancellationToken token)
+ {
+ return await reader.ReadFloatAsync(token);
+ }
+
+ /// <summary>
+ /// Writes the float to the writer.
+ /// </summary>
+ /// <param name="obj">The float to be encoded</param>
+ /// <param name="writer">The writer to which to write</param>
+ /// <param name="token">Cancellation token</param>
+ public async Task WriteAsync(float obj, IDataWriter writer, CancellationToken token)
+ {
+ await writer.WriteFloatAsync(obj, token);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e70bb56c/lang/cs/Org.Apache.REEF.Wake/StreamingCodec/CommonStreamingCodecs/IntArrayStreamingCodec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/StreamingCodec/CommonStreamingCodecs/IntArrayStreamingCodec.cs b/lang/cs/Org.Apache.REEF.Wake/StreamingCodec/CommonStreamingCodecs/IntArrayStreamingCodec.cs
new file mode 100644
index 0000000..186cae0
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/StreamingCodec/CommonStreamingCodecs/IntArrayStreamingCodec.cs
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Wake.Remote;
+
+namespace Org.Apache.REEF.Wake.StreamingCodec.CommonStreamingCodecs
+{
+ /// <summary>
+ /// Streaming codec for integer array
+ /// </summary>
+ public sealed class IntArrayStreamingCodec : IStreamingCodec<int[]>
+ {
+ /// <summary>
+ /// Injectable constructor
+ /// </summary>
+ [Inject]
+ private IntArrayStreamingCodec()
+ {
+ }
+
+ /// <summary>
+ /// Instantiate the class from the reader.
+ /// </summary>
+ /// <param name="reader">The reader from which to read</param>
+ ///<returns>The integer array read from the reader</returns>
+ public int[] Read(IDataReader reader)
+ {
+ int length = reader.ReadInt32();
+ byte[] buffer = new byte[sizeof(int)*length];
+ reader.Read(ref buffer, 0, buffer.Length);
+ int[] intArr = new int[length];
+ Buffer.BlockCopy(buffer, 0, intArr, 0, buffer.Length);
+ return intArr;
+ }
+
+ /// <summary>
+ /// Writes the integer array to the writer.
+ /// </summary>
+ /// <param name="obj">The integer array to be encoded</param>
+ /// <param name="writer">The writer to which to write</param>
+ public void Write(int[] obj, IDataWriter writer)
+ {
+ if (obj == null)
+ {
+ throw new ArgumentNullException("obj", "integer array is null");
+ }
+
+ writer.WriteInt32(obj.Length);
+ byte[] buffer = new byte[sizeof(int) * obj.Length];
+ Buffer.BlockCopy(obj, 0, buffer, 0, buffer.Length);
+ writer.Write(buffer, 0, buffer.Length);
+ }
+
+ /// <summary>
+ /// Instantiate the class from the reader.
+ /// </summary>
+ /// <param name="reader">The reader from which to read</param>
+ /// <param name="token">Cancellation token</param>
+ /// <returns>The integer array read from the reader</returns>
+ public async Task<int[]> ReadAsync(IDataReader reader, CancellationToken token)
+ {
+ int length = await reader.ReadInt32Async(token);
+ byte[] buffer = new byte[sizeof(int) * length];
+ await reader.ReadAsync(buffer, 0, buffer.Length, token);
+ int[] intArr = new int[length];
+ Buffer.BlockCopy(buffer, 0, intArr, 0, sizeof(int) * length);
+ return intArr;
+ }
+
+ /// <summary>
+ /// Writes the integer array to the writer.
+ /// </summary>
+ /// <param name="obj">The integer array to be encoded</param>
+ /// <param name="writer">The writer to which to write</param>
+ /// <param name="token">Cancellation token</param>
+ public async Task WriteAsync(int[] obj, IDataWriter writer, CancellationToken token)
+ {
+ if (obj == null)
+ {
+ throw new ArgumentNullException("obj", "integer array is null");
+ }
+
+ await writer.WriteInt32Async(obj.Length, token);
+ byte[] buffer = new byte[sizeof(int) * obj.Length];
+ Buffer.BlockCopy(obj, 0, buffer, 0, sizeof(int) * obj.Length);
+ await writer.WriteAsync(buffer, 0, buffer.Length, token);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e70bb56c/lang/cs/Org.Apache.REEF.Wake/StreamingCodec/CommonStreamingCodecs/IntStreamingCodec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/StreamingCodec/CommonStreamingCodecs/IntStreamingCodec.cs b/lang/cs/Org.Apache.REEF.Wake/StreamingCodec/CommonStreamingCodecs/IntStreamingCodec.cs
new file mode 100644
index 0000000..127dec6
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/StreamingCodec/CommonStreamingCodecs/IntStreamingCodec.cs
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Threading;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Wake.Remote;
+
+namespace Org.Apache.REEF.Wake.StreamingCodec.CommonStreamingCodecs
+{
+ /// <summary>
+ /// Streaming codec for integer
+ /// </summary>
+ public sealed class IntStreamingCodec : IStreamingCodec<int>
+ {
+ /// <summary>
+ /// Injectable constructor
+ /// </summary>
+ [Inject]
+ private IntStreamingCodec()
+ {
+ }
+
+ /// <summary>
+ /// Instantiate the class from the reader.
+ /// </summary>
+ /// <param name="reader">The reader from which to read</param>
+ ///<returns>The iinteger read from the reader</returns>
+ public int Read(IDataReader reader)
+ {
+ return reader.ReadInt32();
+ }
+
+ /// <summary>
+ /// Writes the integer to the writer.
+ /// </summary>
+ /// <param name="obj">The integer to be encoded</param>
+ /// <param name="writer">The writer to which to write</param>
+ public void Write(int obj, IDataWriter writer)
+ {
+ writer.WriteInt32(obj);
+ }
+
+ /// <summary>
+ /// Instantiate the class from the reader.
+ /// </summary>
+ /// <param name="reader">The reader from which to read</param>
+ /// <param name="token">Cancellation token</param>
+ /// <returns>The integer read from the reader</returns>
+ public async Task<int> ReadAsync(IDataReader reader, CancellationToken token)
+ {
+ return await reader.ReadInt32Async(token);
+ }
+
+ /// <summary>
+ /// Writes the integer to the writer.
+ /// </summary>
+ /// <param name="obj">The integer to be encoded</param>
+ /// <param name="writer">The writer to which to write</param>
+ /// <param name="token">Cancellation token</param>
+ public async Task WriteAsync(int obj, IDataWriter writer, CancellationToken token)
+ {
+ await writer.WriteInt32Async(obj, token);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e70bb56c/lang/cs/Org.Apache.REEF.Wake/StreamingCodec/CommonStreamingCodecs/StringStreamingCodec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/StreamingCodec/CommonStreamingCodecs/StringStreamingCodec.cs b/lang/cs/Org.Apache.REEF.Wake/StreamingCodec/CommonStreamingCodecs/StringStreamingCodec.cs
new file mode 100644
index 0000000..8bbfb8e
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/StreamingCodec/CommonStreamingCodecs/StringStreamingCodec.cs
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Threading;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Wake.Remote;
+
+namespace Org.Apache.REEF.Wake.StreamingCodec.CommonStreamingCodecs
+{
+ /// <summary>
+ /// Streaming codec for string
+ /// </summary>
+ public sealed class StringStreamingCodec : IStreamingCodec<string>
+ {
+ /// <summary>
+ /// Injectable constructor
+ /// </summary>
+ [Inject]
+ private StringStreamingCodec()
+ {
+ }
+
+ /// <summary>
+ /// Instantiate the class from the reader.
+ /// </summary>
+ /// <param name="reader">The reader from which to read</param>
+ ///<returns>The string read from the reader</returns>
+ public string Read(IDataReader reader)
+ {
+ return reader.ReadString();
+ }
+
+ /// <summary>
+ /// Writes the string to the writer.
+ /// </summary>
+ /// <param name="obj">The string to be encoded</param>
+ /// <param name="writer">The writer to which to write</param>
+ public void Write(string obj, IDataWriter writer)
+ {
+ writer.WriteString(obj);
+ }
+
+ /// <summary>
+ /// Instantiate the class from the reader.
+ /// </summary>
+ /// <param name="reader">The reader from which to read</param>
+ /// <param name="token">Cancellation token</param>
+ /// <returns>The string read from the reader</returns>
+ public async Task<string> ReadAsync(IDataReader reader, CancellationToken token)
+ {
+ return await reader.ReadStringAsync(token);
+ }
+
+ /// <summary>
+ /// Writes the string to the writer.
+ /// </summary>
+ /// <param name="obj">The string to be encoded</param>
+ /// <param name="writer">The writer to which to write</param>
+ /// <param name="token">Cancellation token</param>
+ public async Task WriteAsync(string obj, IDataWriter writer, CancellationToken token)
+ {
+ await writer.WriteStringAsync(obj, token);
+ }
+ }
+}