You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by ju...@apache.org on 2015/11/03 04:15:28 UTC
incubator-reef git commit: [REEF-576] Introduce IFileSystem in
IUpdateFunction to write results This addressed the issue by * Creating an
interface IIMRUResultHandler that specifies what to do with the result. It is
injected in UpdateTaskHost * allowing
Repository: incubator-reef
Updated Branches:
refs/heads/master f5ee672c8 -> 7d1ccf1ef
[REEF-576] Introduce IFileSystem in IUpdateFunction to write results
This addressed the issue by
* Creating an interface IIMRUResultHandler that specifies what to do with the result. It is injected in UpdateTaskHost
* allowing users to specify the Result Handler using IMRUJobDefinition interface
* providing two common implementations: DeafultResultHandler (do nothing with result), WriteResuktHandler (Write result to distributed or local filesystem usinf IFilesystem interface)
* updating MapperCount example to enable it to write output
JIRA:
[REEF-576](https://issues.apache.org/jira/browse/REEF-576)
This closes #576
Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/7d1ccf1e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/7d1ccf1e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/7d1ccf1e
Branch: refs/heads/master
Commit: 7d1ccf1ef0ddecabd4f1d98f1db0a432b3a06115
Parents: f5ee672
Author: Dhruv <dh...@apache.org>
Authored: Tue Oct 20 13:18:13 2015 -0700
Committer: Julia Wang <ju...@apache.org>
Committed: Mon Nov 2 18:58:36 2015 -0800
----------------------------------------------------------------------
.../MapperCount/MapperCount.cs | 18 ++-
.../OnREEFIMRURunTimeConfiguration.cs | 4 +-
lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs | 18 ++-
.../MapperCountTest.cs | 2 +-
.../API/IIMRUResultHandler.cs | 39 ++++++
.../API/IMRUJobDefinition.cs | 13 ++
.../API/IMRUJobDefinitionBuilder.cs | 14 +++
.../OnREEF/Client/REEFIMRUClient.cs | 2 +
.../OnREEF/Driver/ConfigurationManager.cs | 29 ++++-
.../OnREEF/Driver/IMRUDriver.cs | 23 +++-
.../OnREEF/IMRUTasks/UpdateTaskHost.cs | 23 ++--
.../SerializedResultHandlerConfiguration.cs | 26 ++++
.../ResultHandler/DefaultResultHandler.cs | 51 ++++++++
.../ResultHandler/ResultOutputLocation.cs | 26 ++++
.../OnREEF/ResultHandler/WriteResultHandler.cs | 119 +++++++++++++++++++
.../Org.Apache.REEF.IMRU.csproj | 5 +
.../HDFSConfigurationWithoutDriverBinding.cs | 62 ++++++++++
.../Org.Apache.REEF.IO.csproj | 1 +
18 files changed, 449 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7d1ccf1e/lang/cs/Org.Apache.REEF.IMRU.Examples/MapperCount/MapperCount.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU.Examples/MapperCount/MapperCount.cs b/lang/cs/Org.Apache.REEF.IMRU.Examples/MapperCount/MapperCount.cs
index 9713b6f..255345a 100644
--- a/lang/cs/Org.Apache.REEF.IMRU.Examples/MapperCount/MapperCount.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU.Examples/MapperCount/MapperCount.cs
@@ -17,10 +17,16 @@
* under the License.
*/
+using System;
using System.Linq;
using Org.Apache.REEF.IMRU.API;
+using Org.Apache.REEF.IMRU.OnREEF.Parameters;
+using Org.Apache.REEF.IMRU.OnREEF.ResultHandler;
+using Org.Apache.REEF.IO.FileSystem.Local;
using Org.Apache.REEF.IO.PartitionedData.Random;
using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
using Org.Apache.REEF.Tang.Util;
using Org.Apache.REEF.Wake.StreamingCodec.CommonStreamingCodecs;
@@ -43,7 +49,7 @@ namespace Org.Apache.REEF.IMRU.Examples.MapperCount
/// Runs the actual mapper count job
/// </summary>
/// <returns>The number of MapFunction instances that are part of the job.</returns>
- public int Run(int numberofMappers)
+ public int Run(int numberofMappers, string outputFile, IConfiguration fileSystemConfig)
{
var results = _imruClient.Submit<int, int, int>(
new IMRUJobDefinitionBuilder()
@@ -66,8 +72,16 @@ namespace Org.Apache.REEF.IMRU.Examples.MapperCount
GenericType<IntSumReduceFunction>.Class)
.Build())
.SetPartitionedDatasetConfiguration(
- RandomInputDataConfiguration.ConfigurationModule.Set(RandomInputDataConfiguration.NumberOfPartitions,
+ RandomInputDataConfiguration.ConfigurationModule.Set(
+ RandomInputDataConfiguration.NumberOfPartitions,
numberofMappers.ToString()).Build())
+ .SetResultHandlerConfiguration(
+ TangFactory.GetTang()
+ .NewConfigurationBuilder(fileSystemConfig)
+ .BindImplementation(GenericType<IIMRUResultHandler<int>>.Class,
+ GenericType<WriteResultHandler<int>>.Class)
+ .BindNamedParameter(typeof(ResultOutputLocation), outputFile)
+ .Build())
.SetMapTaskCores(2)
.SetUpdateTaskCores(3)
.SetJobName("MapperCount")
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7d1ccf1e/lang/cs/Org.Apache.REEF.IMRU.Examples/OnREEFIMRURunTimeConfiguration.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU.Examples/OnREEFIMRURunTimeConfiguration.cs b/lang/cs/Org.Apache.REEF.IMRU.Examples/OnREEFIMRURunTimeConfiguration.cs
index c13efa0..be13e68 100644
--- a/lang/cs/Org.Apache.REEF.IMRU.Examples/OnREEFIMRURunTimeConfiguration.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU.Examples/OnREEFIMRURunTimeConfiguration.cs
@@ -73,11 +73,9 @@ namespace Org.Apache.REEF.IMRU.Examples
IConfiguration imruClientConfig =
REEFIMRUClientConfiguration.ConfigurationModule.Build();
- var fileSystemConfig = HadoopFileSystemConfiguration.ConfigurationModule.Build();
-
IConfiguration runtimeConfig =
YARNClientConfiguration.ConfigurationModule.Build();
- return Configurations.Merge(runtimeConfig, imruClientConfig, fileSystemConfig);
+ return Configurations.Merge(runtimeConfig, imruClientConfig);
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7d1ccf1e/lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs b/lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs
index 9018473..baec036 100644
--- a/lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs
@@ -21,10 +21,12 @@ using System;
using System.Globalization;
using System.Linq;
using Org.Apache.REEF.Client.API;
+using Org.Apache.REEF.Client.Yarn;
using Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce;
+using Org.Apache.REEF.IO.FileSystem.Hadoop;
+using Org.Apache.REEF.IO.FileSystem.Local;
using Org.Apache.REEF.Tang.Implementations.Tang;
using Org.Apache.REEF.Tang.Interface;
-using Org.Apache.REEF.Wake.Remote.Parameters;
using Org.Apache.REEF.Utilities.Logging;
namespace Org.Apache.REEF.IMRU.Examples
@@ -36,24 +38,27 @@ namespace Org.Apache.REEF.IMRU.Examples
{
private static readonly Logger Logger = Logger.GetLogger(typeof(Run));
- private static void RunMapperTest(IConfiguration tcpPortConfig, bool runOnYarn, int numNodes)
+ private static void RunMapperTest(IConfiguration tcpPortConfig, bool runOnYarn, int numNodes, string filename)
{
IInjector injector;
+ IConfiguration fileSystemConfig;
if (!runOnYarn)
{
injector =
TangFactory.GetTang()
.NewInjector(OnREEFIMRURunTimeConfiguration<int, int, int>.GetLocalIMRUConfiguration(numNodes), tcpPortConfig);
+ fileSystemConfig = LocalFileSystemConfiguration.ConfigurationModule.Build();
}
else
{
injector = TangFactory.GetTang()
.NewInjector(OnREEFIMRURunTimeConfiguration<int, int, int>.GetYarnIMRUConfiguration(), tcpPortConfig);
+ fileSystemConfig = HDFSConfigurationWithoutDriverBinding.ConfigurationModule.Build();
}
var mapperCountExample = injector.GetInstance<MapperCount.MapperCount>();
- mapperCountExample.Run(numNodes - 1);
+ mapperCountExample.Run(numNodes - 1, filename, fileSystemConfig);
}
private static void RunBroadcastReduceTest(IConfiguration tcpPortConfig, bool runOnYarn, int numNodes, string[] args)
@@ -114,6 +119,7 @@ namespace Org.Apache.REEF.IMRU.Examples
int numNodes = 2;
int startPort = 8900;
int portRange = 1000;
+ string resultFilename = " ";
if (args != null)
{
@@ -154,7 +160,11 @@ namespace Org.Apache.REEF.IMRU.Examples
{
case "mappercount":
Logger.Log(Level.Info, "Running Mapper count");
- RunMapperTest(tcpPortConfig, runOnYarn, numNodes);
+ if (args.Length > 5)
+ {
+ resultFilename = args[5];
+ }
+ RunMapperTest(tcpPortConfig, runOnYarn, numNodes, resultFilename);
Logger.Log(Level.Info, "Done Running Mapper count");
return;
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7d1ccf1e/lang/cs/Org.Apache.REEF.IMRU.Tests/MapperCountTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU.Tests/MapperCountTest.cs b/lang/cs/Org.Apache.REEF.IMRU.Tests/MapperCountTest.cs
index 2874e13..9599c1d 100644
--- a/lang/cs/Org.Apache.REEF.IMRU.Tests/MapperCountTest.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU.Tests/MapperCountTest.cs
@@ -46,7 +46,7 @@ namespace Org.Apache.REEF.IMRU.Tests
.Build()
)
.GetInstance<MapperCount>();
- var result = tested.Run(NumberOfMappers);
+ var result = tested.Run(NumberOfMappers, "", TangFactory.GetTang().NewConfigurationBuilder().Build());
Assert.AreEqual(NumberOfMappers, result, "The result of the run should be the number of Mappers.");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7d1ccf1e/lang/cs/Org.Apache.REEF.IMRU/API/IIMRUResultHandler.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/IIMRUResultHandler.cs b/lang/cs/Org.Apache.REEF.IMRU/API/IIMRUResultHandler.cs
new file mode 100644
index 0000000..c0276c2
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU/API/IIMRUResultHandler.cs
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using Org.Apache.REEF.Common.Attributes;
+
+namespace Org.Apache.REEF.IMRU.API
+{
+ /// <summary>
+ /// Interface defining how to handle the output of Update function in IMRU
+ /// </summary>
+ /// <typeparam name="T">Result type</typeparam>
+ [Unstable("0.14", "This API will change after introducing proper API for output in REEF.IO")]
+ public interface IIMRUResultHandler<in T> : IDisposable
+ {
+ /// <summary>
+ /// Handles the result of IMRU updata function.
+ /// For example, do nothing, write to file etc.
+ /// </summary>
+ /// <param name="result">The output of IMRU update function</param>
+ void HandleResult(T result);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7d1ccf1e/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs
index ec523e9..bccc7b2 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs
@@ -37,6 +37,7 @@ namespace Org.Apache.REEF.IMRU.API
private readonly IConfiguration _mapOutputPipelineDataConverterConfiguration;
private readonly IConfiguration _mapInputPipelineDataConverterConfiguration;
private readonly IConfiguration _partitionedDatasetConfiguration;
+ private readonly IConfiguration _resultHandlerConfiguration;
private readonly int _numberOfMappers;
private readonly int _memoryPerMapper;
private readonly int _updateTaskMemory;
@@ -60,6 +61,7 @@ namespace Org.Apache.REEF.IMRU.API
/// PipelineDataConverter for TMapInput</param>
/// <param name="partitionedDatasetConfiguration">Configuration of partitioned
/// dataset</param>
+ /// <param name="resultHandlerConfiguration">Configuration of result handler</param>
/// <param name="perMapConfigGeneratorConfig">Per mapper configuration</param>
/// <param name="numberOfMappers">Number of mappers</param>
/// <param name="memoryPerMapper">Per Mapper memory.</param>
@@ -77,6 +79,7 @@ namespace Org.Apache.REEF.IMRU.API
IConfiguration mapOutputPipelineDataConverterConfiguration,
IConfiguration mapInputPipelineDataConverterConfiguration,
IConfiguration partitionedDatasetConfiguration,
+ IConfiguration resultHandlerConfiguration,
ISet<IConfiguration> perMapConfigGeneratorConfig,
int numberOfMappers,
int memoryPerMapper,
@@ -102,6 +105,7 @@ namespace Org.Apache.REEF.IMRU.API
_updateTaskCores = updateTaskCores;
_perMapConfigGeneratorConfig = perMapConfigGeneratorConfig;
_invokeGC = invokeGC;
+ _resultHandlerConfiguration = resultHandlerConfiguration;
}
/// <summary>
@@ -235,5 +239,14 @@ namespace Org.Apache.REEF.IMRU.API
{
get { return _invokeGC; }
}
+
+ /// <summary>
+ /// Configuration of the result handler
+ /// </summary>
+ /// <returns></returns>
+ internal IConfiguration ResultHandlerConfiguration
+ {
+ get { return _resultHandlerConfiguration; }
+ }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7d1ccf1e/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs
index 2731e1b..c55877a 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs
@@ -52,6 +52,7 @@ namespace Org.Apache.REEF.IMRU.API
private IConfiguration _mapOutputPipelineDataConverterConfiguration;
private IConfiguration _mapInputPipelineDataConverterConfiguration;
private IConfiguration _partitionedDatasetConfiguration;
+ private IConfiguration _resultHandlerConfiguration;
private readonly ISet<IConfiguration> _perMapConfigGeneratorConfig;
private bool _invokeGC;
@@ -66,6 +67,7 @@ namespace Org.Apache.REEF.IMRU.API
_mapInputPipelineDataConverterConfiguration = EmptyConfiguration;
_mapOutputPipelineDataConverterConfiguration = EmptyConfiguration;
_partitionedDatasetConfiguration = EmptyConfiguration;
+ _resultHandlerConfiguration = EmptyConfiguration;
_memoryPerMapper = 512;
_updateTaskMemory = 512;
_coresPerMapper = 1;
@@ -248,6 +250,17 @@ namespace Org.Apache.REEF.IMRU.API
}
/// <summary>
+ /// Sets Result handler Configuration
+ /// </summary>
+ /// <param name="resultHandlerConfig">Result handler config</param>
+ /// <returns></returns>
+ public IMRUJobDefinitionBuilder SetResultHandlerConfiguration(IConfiguration resultHandlerConfig)
+ {
+ _resultHandlerConfiguration = resultHandlerConfig;
+ return this;
+ }
+
+ /// <summary>
/// Whether to invoke Garbage Collector after each IMRU iteration
/// </summary>
/// <param name="invokeGC">variable telling whether to invoke or not</param>
@@ -307,6 +320,7 @@ namespace Org.Apache.REEF.IMRU.API
_mapOutputPipelineDataConverterConfiguration,
_mapInputPipelineDataConverterConfiguration,
_partitionedDatasetConfiguration,
+ _resultHandlerConfiguration,
_perMapConfigGeneratorConfig,
_numberOfMappers,
_memoryPerMapper,
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7d1ccf1e/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs
index d8cdf9f..82044a8 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs
@@ -128,6 +128,8 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Client
_configurationSerializer.ToString(jobDefinition.MapOutputPipelineDataConverterConfiguration))
.BindNamedParameter(typeof(SerializedReduceConfiguration),
_configurationSerializer.ToString(jobDefinition.ReduceFunctionConfiguration))
+ .BindNamedParameter(typeof(SerializedResultHandlerConfiguration),
+ _configurationSerializer.ToString(jobDefinition.ResultHandlerConfiguration))
.BindNamedParameter(typeof(MemoryPerMapper),
jobDefinition.MapperMemory.ToString(CultureInfo.InvariantCulture))
.BindNamedParameter(typeof(MemoryForUpdateTask),
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7d1ccf1e/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ConfigurationManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ConfigurationManager.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ConfigurationManager.cs
index 8c0188c..682ee34 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ConfigurationManager.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ConfigurationManager.cs
@@ -39,6 +39,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
private readonly IConfiguration _updateFunctionConfiguration;
private readonly IConfiguration _mapOutputPipelineDataConverterConfiguration;
private readonly IConfiguration _mapInputPipelineDataConverterConfiguration;
+ private readonly IConfiguration _resultHandlerConfiguration;
[Inject]
private ConfigurationManager(
@@ -48,14 +49,17 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
[Parameter(typeof(SerializedUpdateConfiguration))] string updateConfig,
[Parameter(typeof(SerializedMapInputCodecConfiguration))] string mapInputCodecConfig,
[Parameter(typeof(SerializedUpdateFunctionCodecsConfiguration))] string updateFunctionCodecsConfig,
- [Parameter(typeof(SerializedMapOutputPipelineDataConverterConfiguration))] string mapOutputPipelineDataConverterConfiguration,
- [Parameter(typeof(SerializedMapInputPipelineDataConverterConfiguration))] string mapInputPipelineDataConverterConfiguration)
+ [Parameter(typeof(SerializedMapOutputPipelineDataConverterConfiguration))] string
+ mapOutputPipelineDataConverterConfiguration,
+ [Parameter(typeof(SerializedMapInputPipelineDataConverterConfiguration))] string
+ mapInputPipelineDataConverterConfiguration,
+ [Parameter(typeof(SerializedResultHandlerConfiguration))] string resultHandlerConfiguration)
{
try
{
_mapFunctionConfiguration = configurationSerializer.FromString(mapConfig);
}
- catch(Exception e)
+ catch (Exception e)
{
Exceptions.Throw(e, "Unable to deserialize map function configuration", Logger);
}
@@ -115,6 +119,17 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
{
Exceptions.Throw(e, "Unable to deserialize map input pipeline data converter configuration", Logger);
}
+
+ try
+ {
+ _resultHandlerConfiguration =
+ configurationSerializer.FromString(resultHandlerConfiguration);
+ Logger.Log(Level.Verbose,"Serialized result handler is " + resultHandlerConfiguration);
+ }
+ catch (Exception e)
+ {
+ Exceptions.Throw(e, "Unable to deserialize map input pipeline data converter configuration", Logger);
+ }
}
/// <summary>
@@ -173,5 +188,13 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
{
get { return _mapInputPipelineDataConverterConfiguration; }
}
+
+ /// <summary>
+ /// Configuration of Result handler
+ /// </summary>
+ internal IConfiguration ResultHandlerConfiguration
+ {
+ get { return _resultHandlerConfiguration; }
+ }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7d1ccf1e/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
index dc8d4c2..8fb59b5 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
@@ -18,7 +18,6 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
-using System.Globalization;
using System.Linq;
using System.Threading;
using Org.Apache.REEF.Common.Tasks;
@@ -30,6 +29,7 @@ using Org.Apache.REEF.IMRU.API;
using Org.Apache.REEF.IMRU.OnREEF.IMRUTasks;
using Org.Apache.REEF.IMRU.OnREEF.MapInputWithControlMessage;
using Org.Apache.REEF.IMRU.OnREEF.Parameters;
+using Org.Apache.REEF.IMRU.OnREEF.ResultHandler;
using Org.Apache.REEF.IO.PartitionedData;
using Org.Apache.REEF.Network.Group.Driver;
using Org.Apache.REEF.Network.Group.Driver.Impl;
@@ -37,13 +37,13 @@ using Org.Apache.REEF.Network.Group.Pipelining;
using Org.Apache.REEF.Network.Group.Pipelining.Impl;
using Org.Apache.REEF.Network.Group.Topology;
using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Exceptions;
using Org.Apache.REEF.Tang.Implementations.Configuration;
using Org.Apache.REEF.Tang.Implementations.Tang;
using Org.Apache.REEF.Tang.Interface;
using Org.Apache.REEF.Tang.Util;
using Org.Apache.REEF.Utilities.Diagnostics;
using Org.Apache.REEF.Utilities.Logging;
-using Org.Apache.REEF.Wake.Remote.Parameters;
namespace Org.Apache.REEF.IMRU.OnREEF.Driver
{
@@ -168,11 +168,28 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
.Set(TaskConfiguration.Task,
GenericType<UpdateTaskHost<TMapInput, TMapOutput, TResult>>.Class)
.Build(),
- _configurationManager.UpdateFunctionConfiguration
+ _configurationManager.UpdateFunctionConfiguration,
+ _configurationManager.ResultHandlerConfiguration
})
.BindNamedParameter(typeof (InvokeGC), _invokeGC.ToString())
.Build();
+ try
+ {
+ TangFactory.GetTang()
+ .NewInjector(partialTaskConf, _configurationManager.UpdateFunctionCodecsConfiguration)
+ .GetInstance<IIMRUResultHandler<TResult>>();
+ }
+ catch(InjectionException)
+ {
+ partialTaskConf = TangFactory.GetTang().NewConfigurationBuilder(partialTaskConf)
+ .BindImplementation(GenericType<IIMRUResultHandler<TResult>>.Class,
+ GenericType<DefaultResultHandler<TResult>>.Class)
+ .Build();
+ Logger.Log(Level.Warning,
+ "User has not given any way to handle IMRU result, defaulting to ignoring it");
+ }
+
_commGroup.AddTask(IMRUConstants.UpdateTaskName);
_groupCommTaskStarter.QueueTask(partialTaskConf, activeContext);
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7d1ccf1e/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs
index a0d5af3..ceaf8f9 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs
@@ -16,7 +16,6 @@
// under the License.
using System;
-using System.Diagnostics;
using Org.Apache.REEF.Common.Tasks;
using Org.Apache.REEF.IMRU.API;
using Org.Apache.REEF.IMRU.OnREEF.Driver;
@@ -43,23 +42,28 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
private readonly IBroadcastSender<MapInputWithControlMessage<TMapInput>> _dataAndControlMessageSender;
private readonly IUpdateFunction<TMapInput, TMapOutput, TResult> _updateTask;
private readonly bool _invokeGC;
+ private readonly IIMRUResultHandler<TResult> _resultHandler;
/// <summary>
/// </summary>
/// <param name="updateTask">The UpdateTask hosted in this REEF Task.</param>
/// <param name="groupCommunicationsClient">Used to setup the communications.</param>
+ /// <param name="resultHandler">Result handler</param>
/// <param name="invokeGC">Whether to call Garbage Collector after each iteration or not</param>
[Inject]
private UpdateTaskHost(
IUpdateFunction<TMapInput, TMapOutput, TResult> updateTask,
IGroupCommClient groupCommunicationsClient,
- [Parameter(typeof(InvokeGC))] bool invokeGC)
+ IIMRUResultHandler<TResult> resultHandler,
+ [Parameter(typeof (InvokeGC))] bool invokeGC)
{
_updateTask = updateTask;
var cg = groupCommunicationsClient.GetCommunicationGroup(IMRUConstants.CommunicationGroupName);
- _dataAndControlMessageSender = cg.GetBroadcastSender<MapInputWithControlMessage<TMapInput>>(IMRUConstants.BroadcastOperatorName);
+ _dataAndControlMessageSender =
+ cg.GetBroadcastSender<MapInputWithControlMessage<TMapInput>>(IMRUConstants.BroadcastOperatorName);
_dataReceiver = cg.GetReduceReceiver<TMapOutput>(IMRUConstants.ReduceOperatorName);
_invokeGC = invokeGC;
+ _resultHandler = resultHandler;
}
/// <summary>
@@ -70,9 +74,12 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
public byte[] Call(byte[] memento)
{
var updateResult = _updateTask.Initialize();
+ int iterNo = 0;
while (updateResult.HasMapInput)
- {
+ {
+ iterNo++;
+
using (
var message = new MapInputWithControlMessage<TMapInput>(updateResult.MapInput,
MapControlMessage.AnotherRound))
@@ -93,7 +100,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
if (updateResult.HasResult)
{
- // TODO[REEF-576]: Emit output somewhere.
+ _resultHandler.HandleResult(updateResult.Result);
}
}
@@ -101,11 +108,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
new MapInputWithControlMessage<TMapInput>(MapControlMessage.Stop);
_dataAndControlMessageSender.Send(stopMessage);
- if (updateResult.HasResult)
- {
- // TODO[REEF-576]: Emit output somewhere.
- }
-
+ _resultHandler.Dispose();
return null;
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7d1ccf1e/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/SerializedResultHandlerConfiguration.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/SerializedResultHandlerConfiguration.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/SerializedResultHandlerConfiguration.cs
new file mode 100644
index 0000000..b83a6e4
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/SerializedResultHandlerConfiguration.cs
@@ -0,0 +1,26 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.IMRU.OnREEF.Parameters
+{
+ [NamedParameter("The serialized configuration for result handler.")]
+ internal sealed class SerializedResultHandlerConfiguration : Name<string>
+ {
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7d1ccf1e/lang/cs/Org.Apache.REEF.IMRU/OnREEF/ResultHandler/DefaultResultHandler.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/ResultHandler/DefaultResultHandler.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/ResultHandler/DefaultResultHandler.cs
new file mode 100644
index 0000000..b694616
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/ResultHandler/DefaultResultHandler.cs
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using Org.Apache.REEF.Common.Attributes;
+using Org.Apache.REEF.IMRU.API;
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.IMRU.OnREEF.ResultHandler
+{
+ [Unstable("0.14", "This API will change after introducing proper API for output in REEF.IO")]
+ internal class DefaultResultHandler<TResult> : IIMRUResultHandler<TResult>
+ {
+ [Inject]
+ private DefaultResultHandler()
+ {
+ }
+
+ /// <summary>
+ /// Specifies how to handle the IMRU results from UpdateTask. Does nothing
+ /// </summary>
+ /// <param name="result">The result of IMRU</param>
+ public void HandleResult(TResult result)
+ {
+ }
+
+ /// <summary>
+ /// Handles what to do on completion
+ /// In this case do nothing
+ /// </summary>
+ public void Dispose()
+ {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7d1ccf1e/lang/cs/Org.Apache.REEF.IMRU/OnREEF/ResultHandler/ResultOutputLocation.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/ResultHandler/ResultOutputLocation.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/ResultHandler/ResultOutputLocation.cs
new file mode 100644
index 0000000..0915643
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/ResultHandler/ResultOutputLocation.cs
@@ -0,0 +1,26 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.IMRU.OnREEF.ResultHandler
+{
+ [NamedParameter("output location for writing result", "resultfile", " ")]
+ public sealed class ResultOutputLocation : Name<string>
+ {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7d1ccf1e/lang/cs/Org.Apache.REEF.IMRU/OnREEF/ResultHandler/WriteResultHandler.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/ResultHandler/WriteResultHandler.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/ResultHandler/WriteResultHandler.cs
new file mode 100644
index 0000000..0e1162d
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/ResultHandler/WriteResultHandler.cs
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.IO;
+using Org.Apache.REEF.Common.Attributes;
+using Org.Apache.REEF.IMRU.API;
+using Org.Apache.REEF.IO.FileSystem;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities.Diagnostics;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Wake.Remote.Impl;
+using Org.Apache.REEF.Wake.StreamingCodec;
+
+namespace Org.Apache.REEF.IMRU.OnREEF.ResultHandler
+{
+ /// <summary>
+ /// Writes IMRU result from Update task to a file
+ /// </summary>
+ /// <typeparam name="TResult"></typeparam>
+ [Unstable("0.14", "This API will change after introducing proper API for output in REEF.IO")]
+ public class WriteResultHandler<TResult> : IIMRUResultHandler<TResult>
+ {
+ private static readonly Logger Logger = Logger.GetLogger(typeof (WriteResultHandler<>));
+
+ private readonly IStreamingCodec<TResult> _resultCodec;
+ private readonly IFileSystem _fileSystem;
+ private readonly string _remoteFileName;
+ private readonly string _localFilename;
+
+ [Inject]
+ private WriteResultHandler(
+ IStreamingCodec<TResult> resultCodec,
+ IFileSystem fileSystem,
+ [Parameter(typeof(ResultOutputLocation))] string fileName)
+ {
+ _resultCodec = resultCodec;
+ _fileSystem = fileSystem;
+ _remoteFileName = fileName;
+ _localFilename = GenerateLocalFilename();
+ }
+
+ /// <summary>
+ /// Specifies how to handle the IMRU results from the Update Task. Writes it to a location
+ /// </summary>
+ /// <param name="value">The result of IMRU from the UpdateTask</param>
+ public void HandleResult(TResult value)
+ {
+ if (string.IsNullOrWhiteSpace(_remoteFileName))
+ {
+ return;
+ }
+
+ WriteOutput(value);
+ }
+
+ /// <summary>
+ /// Handles what to do on completion
+ /// In this case write to remote location
+ /// </summary>
+ public void Dispose()
+ {
+ if (string.IsNullOrWhiteSpace(_remoteFileName))
+ {
+ return;
+ }
+
+ Uri remoteUri = _fileSystem.CreateUriForPath(_remoteFileName);
+
+ if (_fileSystem.Exists(remoteUri))
+ {
+ Exceptions.Throw(
+ new Exception(string.Format("Output Uri: {0} already exists", remoteUri)), Logger);
+ }
+
+ _fileSystem.CopyFromLocal(_localFilename, remoteUri);
+ }
+
+ private string GenerateLocalFilename()
+ {
+ string localFileFolder = Path.GetTempPath() + "-partition-" + Guid.NewGuid().ToString("N").Substring(0, 8);
+ Directory.CreateDirectory(localFileFolder);
+ var localFilePath = String.Format("{0}\\{1}", localFileFolder, Guid.NewGuid().ToString("N").Substring(0, 8));
+
+ if (File.Exists(localFilePath))
+ {
+ File.Delete(localFilePath);
+ Logger.Log(Level.Warning, "localFile for output already exists, deleting it: " + localFilePath);
+ }
+
+ return localFilePath;
+ }
+
+ private void WriteOutput(TResult result)
+ {
+ using (FileStream stream = new FileStream(_localFilename, FileMode.Append, FileAccess.Write))
+ {
+ StreamDataWriter writer = new StreamDataWriter(stream);
+ _resultCodec.Write(result, writer);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7d1ccf1e/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
index 53a862e..0fc67d9 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
+++ b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
@@ -45,6 +45,7 @@ under the License.
</ItemGroup>
<ItemGroup>
<Compile Include="API\IIMRUClient.cs" />
+ <Compile Include="API\IIMRUResultHandler.cs" />
<Compile Include="API\IMapFunction.cs" />
<Compile Include="API\IMRUCodecConfiguration.cs" />
<Compile Include="API\IMRUPipelineDataConverterConfiguration.cs" />
@@ -84,6 +85,7 @@ under the License.
<Compile Include="OnREEF\Parameters\CoresPerMapper.cs" />
<Compile Include="OnREEF\Parameters\MemoryForUpdateTask.cs" />
<Compile Include="OnREEF\Parameters\MemoryPerMapper.cs" />
+ <Compile Include="OnREEF\Parameters\SerializedResultHandlerConfiguration.cs" />
<Compile Include="OnREEF\Parameters\SerializedMapConfiguration.cs" />
<Compile Include="OnREEF\Parameters\SerializedMapInputCodecConfiguration.cs" />
<Compile Include="OnREEF\Parameters\SerializedMapInputPipelineDataConverterConfiguration.cs" />
@@ -91,6 +93,9 @@ under the License.
<Compile Include="OnREEF\Parameters\SerializedReduceConfiguration.cs" />
<Compile Include="OnREEF\Parameters\SerializedUpdateConfiguration.cs" />
<Compile Include="OnREEF\Parameters\SerializedUpdateFunctionCodecsConfiguration.cs" />
+ <Compile Include="OnREEF\ResultHandler\DefaultResultHandler.cs" />
+ <Compile Include="OnREEF\ResultHandler\ResultOutputLocation.cs" />
+ <Compile Include="OnREEF\ResultHandler\WriteResultHandler.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
</ItemGroup>
<ItemGroup>
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7d1ccf1e/lang/cs/Org.Apache.REEF.IO/FileSystem/Hadoop/HDFSConfigurationWithoutDriverBinding.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IO/FileSystem/Hadoop/HDFSConfigurationWithoutDriverBinding.cs b/lang/cs/Org.Apache.REEF.IO/FileSystem/Hadoop/HDFSConfigurationWithoutDriverBinding.cs
new file mode 100644
index 0000000..9357adb
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IO/FileSystem/Hadoop/HDFSConfigurationWithoutDriverBinding.cs
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.Common.Io;
+using Org.Apache.REEF.Common.Client.Parameters;
+using Org.Apache.REEF.IO.FileSystem.Hadoop.Parameters;
+using Org.Apache.REEF.Tang.Formats;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+
+namespace Org.Apache.REEF.IO.FileSystem.Hadoop
+{
+ /// <summary>
+ /// Configuration Module for the (command based) Hadoop file system implementation of IFileSystem.
+ /// </summary>
+ /// <remarks>
+ /// This IFileSystem implementation is enormously slow as it spawns a new JVM per API call. Avoid if you have better means
+ /// of file system access.
+ /// Also, Stream-based operations are not supported.
+ /// </remarks>
+ public sealed class HDFSConfigurationWithoutDriverBinding : ConfigurationModuleBuilder
+ {
+ /// <summary>
+ /// The number of times each HDFS command will be retried. Defaults to 3.
+ /// </summary>
+ public static readonly OptionalParameter<int> CommandRetries = new OptionalParameter<int>();
+
+ /// <summary>
+ /// The timeout (in milliseconds) for HDFS commands. Defaults to 300000 (5 minutes).
+ /// </summary>
+ public static readonly OptionalParameter<int> CommandTimeOut = new OptionalParameter<int>();
+
+ /// <summary>
+ /// The folder in which Hadoop is installed. Defaults to %HADOOP_HOME%.
+ /// </summary>
+ public static readonly OptionalParameter<string> HadoopHome = new OptionalParameter<string>();
+
+ /// <summary>
+ /// Set all the parameters needed for injecting HadoopFileSystemConfigurationProvider.
+ /// </summary>
+ public static readonly ConfigurationModule ConfigurationModule = new HDFSConfigurationWithoutDriverBinding()
+ .BindImplementation(GenericType<IFileSystem>.Class, GenericType<HadoopFileSystem>.Class)
+ .BindNamedParameter(GenericType<NumberOfRetries>.Class, CommandRetries)
+ .BindNamedParameter(GenericType<CommandTimeOut>.Class, CommandTimeOut)
+ .BindNamedParameter(GenericType<HadoopHome>.Class, HadoopHome)
+ .Build();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7d1ccf1e/lang/cs/Org.Apache.REEF.IO/Org.Apache.REEF.IO.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IO/Org.Apache.REEF.IO.csproj b/lang/cs/Org.Apache.REEF.IO/Org.Apache.REEF.IO.csproj
index 6264ea4..9af9d24 100644
--- a/lang/cs/Org.Apache.REEF.IO/Org.Apache.REEF.IO.csproj
+++ b/lang/cs/Org.Apache.REEF.IO/Org.Apache.REEF.IO.csproj
@@ -42,6 +42,7 @@ under the License.
</ItemGroup>
<ItemGroup>
<Compile Include="FileSystem\Hadoop\CommandResult.cs" />
+ <Compile Include="FileSystem\Hadoop\HDFSConfigurationWithoutDriverBinding.cs" />
<Compile Include="FileSystem\Hadoop\HadoopFileSystemConfiguration.cs" />
<Compile Include="FileSystem\Hadoop\HadoopFileSystemConfigurationProvider.cs" />
<Compile Include="FileSystem\Hadoop\HDFSCommandRunner.cs" />