You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2016/02/20 03:15:57 UTC

reef git commit: [REEF-1206] Update Deserializer API to allow deserialize from remote files directly

Repository: reef
Updated Branches:
  refs/heads/master 3818bf454 -> dc78d381f


[REEF-1206]  Update Deserializer API to allow deserialize from remote files directly

 * Adding a new Deserializer API
 * Deprecate old API
 * Adding a named parameter for CopyToLocal
 * SetMemory for IMRU

JIRA:
  [REEF-1206](https://issues.apache.org/jira/browse/REEF-1206)

Pull request:
  This closes #843


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/dc78d381
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/dc78d381
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/dc78d381

Branch: refs/heads/master
Commit: dc78d381f98c4a3d5559972bed07e2af49ba621d
Parents: 3818bf4
Author: Julia Wang <ju...@microsoft.com>
Authored: Thu Jan 28 17:23:22 2016 -0800
Committer: Markus Weimer <we...@apache.org>
Committed: Fri Feb 19 18:13:24 2016 -0800

----------------------------------------------------------------------
 .../OnREEF/Client/REEFIMRUClient.cs             |  1 +
 .../HadoopFileInputPartitionTest.cs             | 18 +++++++++++
 .../TestFilePartitionInputDataSet.cs            | 34 ++++++++++++++++++++
 .../Org.Apache.REEF.IO.csproj                   |  1 +
 .../FileSystem/FileSystemInputPartition.cs      | 30 ++++++++++-------
 .../FileSystem/IFileDeSerializer.cs             | 14 ++++++++
 .../FileSystem/Parameters/CopyToLocal.cs        | 29 +++++++++++++++++
 7 files changed, 116 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/dc78d381/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs
index 8e33c82..672df9a 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs
@@ -146,6 +146,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Client
                 .AddDriverConfiguration(imruDriverConfiguration)
                 .AddGlobalAssemblyForType(typeof(IMRUDriver<TMapInput, TMapOutput, TResult>))
                 .SetJobIdentifier(jobDefinition.JobName)
+                .SetDriverMemory(5000)
                 .Build();
 
             _jobSubmissionResult = _reefClient.SubmitAndGetJobStatus(imruJobSubmission);

http://git-wip-us.apache.org/repos/asf/reef/blob/dc78d381/lang/cs/Org.Apache.REEF.IO.TestClient/HadoopFileInputPartitionTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IO.TestClient/HadoopFileInputPartitionTest.cs b/lang/cs/Org.Apache.REEF.IO.TestClient/HadoopFileInputPartitionTest.cs
index 10f5cbc..a1e5f84 100644
--- a/lang/cs/Org.Apache.REEF.IO.TestClient/HadoopFileInputPartitionTest.cs
+++ b/lang/cs/Org.Apache.REEF.IO.TestClient/HadoopFileInputPartitionTest.cs
@@ -23,6 +23,7 @@ using Org.Apache.REEF.IO.FileSystem;
 using Org.Apache.REEF.IO.FileSystem.Hadoop;
 using Org.Apache.REEF.IO.PartitionedData;
 using Org.Apache.REEF.IO.PartitionedData.FileSystem;
+using Org.Apache.REEF.IO.PartitionedData.FileSystem.Parameters;
 using Org.Apache.REEF.Tang.Annotations;
 using Org.Apache.REEF.Tang.Formats;
 using Org.Apache.REEF.Tang.Implementations.Tang;
@@ -45,6 +46,7 @@ namespace Org.Apache.REEF.IO.TestClient
             var serializerConf = TangFactory.GetTang().NewConfigurationBuilder()
                 .BindImplementation<IFileDeSerializer<IEnumerable<byte>>, ByteSerializer>(GenericType<IFileDeSerializer<IEnumerable<byte>>>.Class,
                     GenericType<ByteSerializer>.Class)
+                .BindNamedParam<CopyToLocal, bool>("true")
                 .Build();
             var serializerConfString = (new AvroConfigurationSerializer()).ToString(serializerConf);
 
@@ -147,10 +149,26 @@ namespace Org.Apache.REEF.IO.TestClient
         /// </summary>
         /// <param name="fileFolder"></param>
         /// <returns></returns>
+        [Obsolete("Remove after 0.14")]
         public IEnumerable<byte> Deserialize(string fileFolder)
         {
+            var files = new HashSet<string>();
             foreach (var f in Directory.GetFiles(fileFolder))
             {
+                files.Add(f);
+            }
+            return Deserialize(files);
+        }
+
+        /// <summary>
+        /// Read bytes from all the files in the set and return one by one
+        /// </summary>
+        /// <param name="filePaths"></param>
+        /// <returns></returns>
+        public IEnumerable<byte> Deserialize(ISet<string> filePaths)
+        {
+            foreach (var f in filePaths)
+            {
                 using (FileStream stream = File.Open(f, FileMode.Open))
                 {
                     BinaryReader reader = new BinaryReader(stream);

http://git-wip-us.apache.org/repos/asf/reef/blob/dc78d381/lang/cs/Org.Apache.REEF.IO.Tests/TestFilePartitionInputDataSet.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IO.Tests/TestFilePartitionInputDataSet.cs b/lang/cs/Org.Apache.REEF.IO.Tests/TestFilePartitionInputDataSet.cs
index 1666777..5d829c3 100644
--- a/lang/cs/Org.Apache.REEF.IO.Tests/TestFilePartitionInputDataSet.cs
+++ b/lang/cs/Org.Apache.REEF.IO.Tests/TestFilePartitionInputDataSet.cs
@@ -287,6 +287,7 @@ namespace Org.Apache.REEF.IO.Tests
                 .BindImplementation<IFileDeSerializer<IEnumerable<byte>>, ByteSerializer>(
                     GenericType<IFileDeSerializer<IEnumerable<byte>>>.Class,
                     GenericType<ByteSerializer>.Class)
+                .BindNamedParam<CopyToLocal, bool>("true")
                 .Build();
             return (new AvroConfigurationSerializer()).ToString(serializerConf);
         }
@@ -297,6 +298,7 @@ namespace Org.Apache.REEF.IO.Tests
                 .BindImplementation<IFileDeSerializer<IEnumerable<Row>>, RowSerializer>(
                     GenericType<IFileDeSerializer<IEnumerable<Row>>>.Class,
                     GenericType<RowSerializer>.Class)
+                .BindNamedParam<CopyToLocal, bool>("true")
                 .Build();
             return (new AvroConfigurationSerializer()).ToString(serializerConf);
         }
@@ -314,10 +316,26 @@ namespace Org.Apache.REEF.IO.Tests
         /// </summary>
         /// <param name="fileFolder"></param>
         /// <returns></returns>
+        [Obsolete("Remove after 0.14")]
         public IEnumerable<byte> Deserialize(string fileFolder)
         {
+            var files = new HashSet<string>();
             foreach (var f in Directory.GetFiles(fileFolder))
             {
+                files.Add(f);
+            }
+            return Deserialize(files);
+        }
+
+        /// <summary>
+        /// Enumerate all the files in the set and return each byte read
+        /// </summary>
+        /// <param name="filePaths"></param>
+        /// <returns></returns>
+        public IEnumerable<byte> Deserialize(ISet<string> filePaths)
+        {
+            foreach (var f in filePaths)
+            {
                 using (FileStream stream = File.Open(f, FileMode.Open))
                 {
                     BinaryReader reader = new BinaryReader(stream);
@@ -357,10 +375,26 @@ namespace Org.Apache.REEF.IO.Tests
         /// </summary>
         /// <param name="fileFolder"></param>
         /// <returns></returns>
+        [Obsolete("Remove after 0.14")]
         public IEnumerable<Row> Deserialize(string fileFolder)
         {
+            ISet<string> files = new HashSet<string>();
             foreach (var f in Directory.GetFiles(fileFolder))
             {
+                files.Add(f);
+            }
+            return Deserialize(files);
+        }
+
+        /// <summary>
+        /// read all the files in the set and return byte read one by one
+        /// </summary>
+        /// <param name="filePaths"></param>
+        /// <returns></returns>
+        public IEnumerable<Row> Deserialize(ISet<string> filePaths)
+        {
+            foreach (var f in filePaths)
+            {
                 using (FileStream stream = File.Open(f, FileMode.Open))
                 {
                     BinaryReader reader = new BinaryReader(stream);

http://git-wip-us.apache.org/repos/asf/reef/blob/dc78d381/lang/cs/Org.Apache.REEF.IO/Org.Apache.REEF.IO.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IO/Org.Apache.REEF.IO.csproj b/lang/cs/Org.Apache.REEF.IO/Org.Apache.REEF.IO.csproj
index 016cb7c..fc321b9 100644
--- a/lang/cs/Org.Apache.REEF.IO/Org.Apache.REEF.IO.csproj
+++ b/lang/cs/Org.Apache.REEF.IO/Org.Apache.REEF.IO.csproj
@@ -100,6 +100,7 @@ under the License.
     <Compile Include="FileSystem\Local\LocalFileSystemConfiguration.cs" />
     <Compile Include="PartitionedData\FileSystem\FileSystemInputPartition.cs" />
     <Compile Include="PartitionedData\FileSystem\FileInputPartitionDescriptor.cs" />
+    <Compile Include="PartitionedData\FileSystem\Parameters\CopyToLocal.cs" />
     <Compile Include="PartitionedData\FileSystem\Parameters\FileDeSerializerConfigString.cs" />
     <Compile Include="PartitionedData\FileSystem\IFileDeSerializer.cs" />
     <Compile Include="PartitionedData\FileSystem\FileSystemPartitionInputDataSet.cs" />

http://git-wip-us.apache.org/repos/asf/reef/blob/dc78d381/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileSystemInputPartition.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileSystemInputPartition.cs b/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileSystemInputPartition.cs
index aba657a..cccc2be 100644
--- a/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileSystemInputPartition.cs
+++ b/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileSystemInputPartition.cs
@@ -38,13 +38,15 @@ namespace Org.Apache.REEF.IO.PartitionedData.FileSystem
         private readonly IFileDeSerializer<T> _fileSerializer;
         private readonly ISet<string> _filePaths;
         private bool _isInitialized;
+        private readonly bool _copyToLocal;
         private readonly object _lock = new object();
-        private string _localFileFolder;
         private readonly ITempFileCreator _tempFileCreator;
+        private readonly ISet<string> _localFiles = new HashSet<string>();
 
         [Inject]
         private FileSystemInputPartition([Parameter(typeof(PartitionId))] string id,
             [Parameter(typeof(FilePathsInInputPartition))] ISet<string> filePaths,
+            [Parameter(typeof(CopyToLocal))] bool copyToLocal,
             IFileSystem fileSystem,
             ITempFileCreator tempFileCreator,
             IFileDeSerializer<T> fileSerializer)
@@ -55,6 +57,7 @@ namespace Org.Apache.REEF.IO.PartitionedData.FileSystem
             _filePaths = filePaths;
             _tempFileCreator = tempFileCreator;
             _isInitialized = false;
+            _copyToLocal = copyToLocal;
         }
 
         public string Id
@@ -82,17 +85,22 @@ namespace Org.Apache.REEF.IO.PartitionedData.FileSystem
         /// <returns></returns>
         public T GetPartitionHandle()
         {
-            if (!_isInitialized)
+            if (_copyToLocal)
             {
-                Initialize();
+                if (!_isInitialized)
+                {
+                    Initialize();
+                }
+
+                return _fileSerializer.Deserialize(_localFiles);
             }
-            return _fileSerializer.Deserialize(_localFileFolder);
+            return _fileSerializer.Deserialize(_filePaths);
         }
 
         private void CopyFromRemote()
         {
-            _localFileFolder = _tempFileCreator.CreateTempDirectory("-partition-");
-            Logger.Log(Level.Info, string.Format(CultureInfo.CurrentCulture, "Local file temp folder: {0}", _localFileFolder));
+            string localFileFolder = _tempFileCreator.CreateTempDirectory("-partition-");
+            Logger.Log(Level.Info, string.Format(CultureInfo.CurrentCulture, "Local file temp folder: {0}", localFileFolder));
 
             foreach (var sourceFilePath in _filePaths)
             {
@@ -104,7 +112,9 @@ namespace Org.Apache.REEF.IO.PartitionedData.FileSystem
                         "Remote File {0} does not exists.", sourceUri));
                 }
 
-                var localFilePath = _localFileFolder + "\\" + Guid.NewGuid().ToString("N").Substring(0, 8);
+                var localFilePath = localFileFolder + "\\" + Guid.NewGuid().ToString("N").Substring(0, 8);
+                _localFiles.Add(localFilePath);
+
                 Logger.Log(Level.Info, string.Format(CultureInfo.CurrentCulture, "LocalFilePath {0}: ", localFilePath));
                 if (File.Exists(localFilePath))
                 {
@@ -134,14 +144,12 @@ namespace Org.Apache.REEF.IO.PartitionedData.FileSystem
         /// </summary>
         public void Dispose()
         {
-            if (_localFileFolder != null)
+            if (_localFiles.Count > 0)
             {
-                foreach (var fileName in Directory.GetFiles(_localFileFolder))
+                foreach (var fileName in _localFiles)
                 {
                     File.Delete(fileName);
                 }
-                Directory.Delete(_localFileFolder);
-                _localFileFolder = null;
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/reef/blob/dc78d381/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/IFileDeSerializer.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/IFileDeSerializer.cs b/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/IFileDeSerializer.cs
index 6759937..988b11d 100644
--- a/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/IFileDeSerializer.cs
+++ b/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/IFileDeSerializer.cs
@@ -15,6 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
+using System;
+using System.Collections.Generic;
+
 namespace Org.Apache.REEF.IO.PartitionedData.FileSystem
 {
     /// <summary>
@@ -30,6 +33,17 @@ namespace Org.Apache.REEF.IO.PartitionedData.FileSystem
         /// </summary>
         /// <param name="fileFolder"></param>
         /// <returns></returns>
+        [Obsolete("Remove after 0.14")]
         T Deserialize(string fileFolder);
+
+        /// <summary>
+        /// The input is a set of file paths. 
+        /// The output is of type T which is defined by the client.
+        /// If there is any IO error, IOException could be thrown.
+        /// </summary>
+        /// <param name="filePaths"></param>
+        /// <param name="local"></param>
+        /// <returns></returns>
+        T Deserialize(ISet<string> filePaths);
     }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/dc78d381/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/Parameters/CopyToLocal.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/Parameters/CopyToLocal.cs b/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/Parameters/CopyToLocal.cs
new file mode 100644
index 0000000..5a2da3e
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/Parameters/CopyToLocal.cs
@@ -0,0 +1,29 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.IO.PartitionedData.FileSystem.Parameters
+{
+    /// <summary>
+    /// The flag that specifies if the remote files need to be copied to local or not
+    /// </summary>
+    [NamedParameter(Documentation = "Specify if the remote files need to be copied to local", ShortName = "copyToLocal", DefaultValue = "true")]
+    internal sealed class CopyToLocal : Name<bool>
+    {
+    }
+}
\ No newline at end of file