You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2016/02/20 03:15:57 UTC
reef git commit: [REEF-1206] Update Deserializer API to allow
deserialize from remote files directly
Repository: reef
Updated Branches:
refs/heads/master 3818bf454 -> dc78d381f
[REEF-1206] Update Deserializer API to allow deserialize from remote files directly
* Adding a new Deserializer API
* Deprecate old API
* Adding a named parameter for CopyToLocal
* SetMemory for IMRU
JIRA:
[REEF-1206](https://issues.apache.org/jira/browse/REEF-1206)
Pull request:
This closes #843
Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/dc78d381
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/dc78d381
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/dc78d381
Branch: refs/heads/master
Commit: dc78d381f98c4a3d5559972bed07e2af49ba621d
Parents: 3818bf4
Author: Julia Wang <ju...@microsoft.com>
Authored: Thu Jan 28 17:23:22 2016 -0800
Committer: Markus Weimer <we...@apache.org>
Committed: Fri Feb 19 18:13:24 2016 -0800
----------------------------------------------------------------------
.../OnREEF/Client/REEFIMRUClient.cs | 1 +
.../HadoopFileInputPartitionTest.cs | 18 +++++++++++
.../TestFilePartitionInputDataSet.cs | 34 ++++++++++++++++++++
.../Org.Apache.REEF.IO.csproj | 1 +
.../FileSystem/FileSystemInputPartition.cs | 30 ++++++++++-------
.../FileSystem/IFileDeSerializer.cs | 14 ++++++++
.../FileSystem/Parameters/CopyToLocal.cs | 29 +++++++++++++++++
7 files changed, 116 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/reef/blob/dc78d381/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs
index 8e33c82..672df9a 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs
@@ -146,6 +146,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Client
.AddDriverConfiguration(imruDriverConfiguration)
.AddGlobalAssemblyForType(typeof(IMRUDriver<TMapInput, TMapOutput, TResult>))
.SetJobIdentifier(jobDefinition.JobName)
+ .SetDriverMemory(5000)
.Build();
_jobSubmissionResult = _reefClient.SubmitAndGetJobStatus(imruJobSubmission);
http://git-wip-us.apache.org/repos/asf/reef/blob/dc78d381/lang/cs/Org.Apache.REEF.IO.TestClient/HadoopFileInputPartitionTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IO.TestClient/HadoopFileInputPartitionTest.cs b/lang/cs/Org.Apache.REEF.IO.TestClient/HadoopFileInputPartitionTest.cs
index 10f5cbc..a1e5f84 100644
--- a/lang/cs/Org.Apache.REEF.IO.TestClient/HadoopFileInputPartitionTest.cs
+++ b/lang/cs/Org.Apache.REEF.IO.TestClient/HadoopFileInputPartitionTest.cs
@@ -23,6 +23,7 @@ using Org.Apache.REEF.IO.FileSystem;
using Org.Apache.REEF.IO.FileSystem.Hadoop;
using Org.Apache.REEF.IO.PartitionedData;
using Org.Apache.REEF.IO.PartitionedData.FileSystem;
+using Org.Apache.REEF.IO.PartitionedData.FileSystem.Parameters;
using Org.Apache.REEF.Tang.Annotations;
using Org.Apache.REEF.Tang.Formats;
using Org.Apache.REEF.Tang.Implementations.Tang;
@@ -45,6 +46,7 @@ namespace Org.Apache.REEF.IO.TestClient
var serializerConf = TangFactory.GetTang().NewConfigurationBuilder()
.BindImplementation<IFileDeSerializer<IEnumerable<byte>>, ByteSerializer>(GenericType<IFileDeSerializer<IEnumerable<byte>>>.Class,
GenericType<ByteSerializer>.Class)
+ .BindNamedParam<CopyToLocal, bool>("true")
.Build();
var serializerConfString = (new AvroConfigurationSerializer()).ToString(serializerConf);
@@ -147,10 +149,26 @@ namespace Org.Apache.REEF.IO.TestClient
/// </summary>
/// <param name="fileFolder"></param>
/// <returns></returns>
+ [Obsolete("Remove after 0.14")]
public IEnumerable<byte> Deserialize(string fileFolder)
{
+ var files = new HashSet<string>();
foreach (var f in Directory.GetFiles(fileFolder))
{
+ files.Add(f);
+ }
+ return Deserialize(files);
+ }
+
+ /// <summary>
+ /// Read bytes from all the files in the set and return one by one
+ /// </summary>
+ /// <param name="filePaths"></param>
+ /// <returns></returns>
+ public IEnumerable<byte> Deserialize(ISet<string> filePaths)
+ {
+ foreach (var f in filePaths)
+ {
using (FileStream stream = File.Open(f, FileMode.Open))
{
BinaryReader reader = new BinaryReader(stream);
http://git-wip-us.apache.org/repos/asf/reef/blob/dc78d381/lang/cs/Org.Apache.REEF.IO.Tests/TestFilePartitionInputDataSet.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IO.Tests/TestFilePartitionInputDataSet.cs b/lang/cs/Org.Apache.REEF.IO.Tests/TestFilePartitionInputDataSet.cs
index 1666777..5d829c3 100644
--- a/lang/cs/Org.Apache.REEF.IO.Tests/TestFilePartitionInputDataSet.cs
+++ b/lang/cs/Org.Apache.REEF.IO.Tests/TestFilePartitionInputDataSet.cs
@@ -287,6 +287,7 @@ namespace Org.Apache.REEF.IO.Tests
.BindImplementation<IFileDeSerializer<IEnumerable<byte>>, ByteSerializer>(
GenericType<IFileDeSerializer<IEnumerable<byte>>>.Class,
GenericType<ByteSerializer>.Class)
+ .BindNamedParam<CopyToLocal, bool>("true")
.Build();
return (new AvroConfigurationSerializer()).ToString(serializerConf);
}
@@ -297,6 +298,7 @@ namespace Org.Apache.REEF.IO.Tests
.BindImplementation<IFileDeSerializer<IEnumerable<Row>>, RowSerializer>(
GenericType<IFileDeSerializer<IEnumerable<Row>>>.Class,
GenericType<RowSerializer>.Class)
+ .BindNamedParam<CopyToLocal, bool>("true")
.Build();
return (new AvroConfigurationSerializer()).ToString(serializerConf);
}
@@ -314,10 +316,26 @@ namespace Org.Apache.REEF.IO.Tests
/// </summary>
/// <param name="fileFolder"></param>
/// <returns></returns>
+ [Obsolete("Remove after 0.14")]
public IEnumerable<byte> Deserialize(string fileFolder)
{
+ var files = new HashSet<string>();
foreach (var f in Directory.GetFiles(fileFolder))
{
+ files.Add(f);
+ }
+ return Deserialize(files);
+ }
+
+ /// <summary>
+ /// Enumerate all the files in the set and return each byte read
+ /// </summary>
+ /// <param name="filePaths"></param>
+ /// <returns></returns>
+ public IEnumerable<byte> Deserialize(ISet<string> filePaths)
+ {
+ foreach (var f in filePaths)
+ {
using (FileStream stream = File.Open(f, FileMode.Open))
{
BinaryReader reader = new BinaryReader(stream);
@@ -357,10 +375,26 @@ namespace Org.Apache.REEF.IO.Tests
/// </summary>
/// <param name="fileFolder"></param>
/// <returns></returns>
+ [Obsolete("Remove after 0.14")]
public IEnumerable<Row> Deserialize(string fileFolder)
{
+ ISet<string> files = new HashSet<string>();
foreach (var f in Directory.GetFiles(fileFolder))
{
+ files.Add(f);
+ }
+ return Deserialize(files);
+ }
+
+ /// <summary>
+ /// read all the files in the set and return byte read one by one
+ /// </summary>
+ /// <param name="filePaths"></param>
+ /// <returns></returns>
+ public IEnumerable<Row> Deserialize(ISet<string> filePaths)
+ {
+ foreach (var f in filePaths)
+ {
using (FileStream stream = File.Open(f, FileMode.Open))
{
BinaryReader reader = new BinaryReader(stream);
http://git-wip-us.apache.org/repos/asf/reef/blob/dc78d381/lang/cs/Org.Apache.REEF.IO/Org.Apache.REEF.IO.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IO/Org.Apache.REEF.IO.csproj b/lang/cs/Org.Apache.REEF.IO/Org.Apache.REEF.IO.csproj
index 016cb7c..fc321b9 100644
--- a/lang/cs/Org.Apache.REEF.IO/Org.Apache.REEF.IO.csproj
+++ b/lang/cs/Org.Apache.REEF.IO/Org.Apache.REEF.IO.csproj
@@ -100,6 +100,7 @@ under the License.
<Compile Include="FileSystem\Local\LocalFileSystemConfiguration.cs" />
<Compile Include="PartitionedData\FileSystem\FileSystemInputPartition.cs" />
<Compile Include="PartitionedData\FileSystem\FileInputPartitionDescriptor.cs" />
+ <Compile Include="PartitionedData\FileSystem\Parameters\CopyToLocal.cs" />
<Compile Include="PartitionedData\FileSystem\Parameters\FileDeSerializerConfigString.cs" />
<Compile Include="PartitionedData\FileSystem\IFileDeSerializer.cs" />
<Compile Include="PartitionedData\FileSystem\FileSystemPartitionInputDataSet.cs" />
http://git-wip-us.apache.org/repos/asf/reef/blob/dc78d381/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileSystemInputPartition.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileSystemInputPartition.cs b/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileSystemInputPartition.cs
index aba657a..cccc2be 100644
--- a/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileSystemInputPartition.cs
+++ b/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileSystemInputPartition.cs
@@ -38,13 +38,15 @@ namespace Org.Apache.REEF.IO.PartitionedData.FileSystem
private readonly IFileDeSerializer<T> _fileSerializer;
private readonly ISet<string> _filePaths;
private bool _isInitialized;
+ private readonly bool _copyToLocal;
private readonly object _lock = new object();
- private string _localFileFolder;
private readonly ITempFileCreator _tempFileCreator;
+ private readonly ISet<string> _localFiles = new HashSet<string>();
[Inject]
private FileSystemInputPartition([Parameter(typeof(PartitionId))] string id,
[Parameter(typeof(FilePathsInInputPartition))] ISet<string> filePaths,
+ [Parameter(typeof(CopyToLocal))] bool copyToLocal,
IFileSystem fileSystem,
ITempFileCreator tempFileCreator,
IFileDeSerializer<T> fileSerializer)
@@ -55,6 +57,7 @@ namespace Org.Apache.REEF.IO.PartitionedData.FileSystem
_filePaths = filePaths;
_tempFileCreator = tempFileCreator;
_isInitialized = false;
+ _copyToLocal = copyToLocal;
}
public string Id
@@ -82,17 +85,22 @@ namespace Org.Apache.REEF.IO.PartitionedData.FileSystem
/// <returns></returns>
public T GetPartitionHandle()
{
- if (!_isInitialized)
+ if (_copyToLocal)
{
- Initialize();
+ if (!_isInitialized)
+ {
+ Initialize();
+ }
+
+ return _fileSerializer.Deserialize(_localFiles);
}
- return _fileSerializer.Deserialize(_localFileFolder);
+ return _fileSerializer.Deserialize(_filePaths);
}
private void CopyFromRemote()
{
- _localFileFolder = _tempFileCreator.CreateTempDirectory("-partition-");
- Logger.Log(Level.Info, string.Format(CultureInfo.CurrentCulture, "Local file temp folder: {0}", _localFileFolder));
+ string localFileFolder = _tempFileCreator.CreateTempDirectory("-partition-");
+ Logger.Log(Level.Info, string.Format(CultureInfo.CurrentCulture, "Local file temp folder: {0}", localFileFolder));
foreach (var sourceFilePath in _filePaths)
{
@@ -104,7 +112,9 @@ namespace Org.Apache.REEF.IO.PartitionedData.FileSystem
"Remote File {0} does not exists.", sourceUri));
}
- var localFilePath = _localFileFolder + "\\" + Guid.NewGuid().ToString("N").Substring(0, 8);
+ var localFilePath = localFileFolder + "\\" + Guid.NewGuid().ToString("N").Substring(0, 8);
+ _localFiles.Add(localFilePath);
+
Logger.Log(Level.Info, string.Format(CultureInfo.CurrentCulture, "LocalFilePath {0}: ", localFilePath));
if (File.Exists(localFilePath))
{
@@ -134,14 +144,12 @@ namespace Org.Apache.REEF.IO.PartitionedData.FileSystem
/// </summary>
public void Dispose()
{
- if (_localFileFolder != null)
+ if (_localFiles.Count > 0)
{
- foreach (var fileName in Directory.GetFiles(_localFileFolder))
+ foreach (var fileName in _localFiles)
{
File.Delete(fileName);
}
- Directory.Delete(_localFileFolder);
- _localFileFolder = null;
}
}
}
http://git-wip-us.apache.org/repos/asf/reef/blob/dc78d381/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/IFileDeSerializer.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/IFileDeSerializer.cs b/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/IFileDeSerializer.cs
index 6759937..988b11d 100644
--- a/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/IFileDeSerializer.cs
+++ b/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/IFileDeSerializer.cs
@@ -15,6 +15,9 @@
// specific language governing permissions and limitations
// under the License.
+using System;
+using System.Collections.Generic;
+
namespace Org.Apache.REEF.IO.PartitionedData.FileSystem
{
/// <summary>
@@ -30,6 +33,17 @@ namespace Org.Apache.REEF.IO.PartitionedData.FileSystem
/// </summary>
/// <param name="fileFolder"></param>
/// <returns></returns>
+ [Obsolete("Remove after 0.14")]
T Deserialize(string fileFolder);
+
+ /// <summary>
+ /// The input is a set of file paths.
+ /// The output is of type T which is defined by the client.
+ /// If there is any IO error, IOException could be thrown.
+ /// </summary>
+ /// <param name="filePaths"></param>
+ /// <param name="local"></param>
+ /// <returns></returns>
+ T Deserialize(ISet<string> filePaths);
}
}
http://git-wip-us.apache.org/repos/asf/reef/blob/dc78d381/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/Parameters/CopyToLocal.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/Parameters/CopyToLocal.cs b/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/Parameters/CopyToLocal.cs
new file mode 100644
index 0000000..5a2da3e
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/Parameters/CopyToLocal.cs
@@ -0,0 +1,29 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.IO.PartitionedData.FileSystem.Parameters
+{
+ /// <summary>
+ /// The flag that specifies if the remote files need to be copied to local or not
+ /// </summary>
+ [NamedParameter(Documentation = "Specify if the remote files need to be copied to local", ShortName = "copyToLocal", DefaultValue = "true")]
+ internal sealed class CopyToLocal : Name<bool>
+ {
+ }
+}
\ No newline at end of file