You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by af...@apache.org on 2016/02/25 01:41:17 UTC

reef git commit: [REEF-1219] Adding CopyToLocal in FileSystemInputPartitionConfiguration

Repository: reef
Updated Branches:
  refs/heads/master b45c29c5e -> 4eadc7f03


[REEF-1219]  Adding CopyToLocal in FileSystemInputPartitionConfiguration

Add CopyToLocal  to FileSystemInputPartition Configuration module builder
Update FileSystemPartitionInputDataSet to take it as a named parameter and pass it to the FileInputPartitionDescriptor
Add it to GetPartitionConfiguration() in FileInputPartitionDescriptor

JIRA: [REEF-1219](https://issues.apache.org/jira/browse/REEF-1219)

This closes #856


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/4eadc7f0
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/4eadc7f0
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/4eadc7f0

Branch: refs/heads/master
Commit: 4eadc7f039cafd365013fb2ca7792495e6cc3154
Parents: b45c29c
Author: Julia Wang <ju...@microsoft.com>
Authored: Tue Feb 23 16:49:42 2016 -0800
Committer: Andrew Chung <af...@gmail.com>
Committed: Wed Feb 24 15:41:26 2016 -0800

----------------------------------------------------------------------
 .../HadoopFileInputPartitionTest.cs                            | 2 +-
 .../Org.Apache.REEF.IO.Tests/TestFilePartitionInputDataSet.cs  | 5 +++--
 .../PartitionedData/FileSystem/FileInputPartitionDescriptor.cs | 5 ++++-
 .../FileSystem/FileSystemInputPartitionConfiguration.cs        | 6 ++++++
 .../FileSystem/FileSystemPartitionInputDataSet.cs              | 3 ++-
 5 files changed, 16 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/4eadc7f0/lang/cs/Org.Apache.REEF.IO.TestClient/HadoopFileInputPartitionTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IO.TestClient/HadoopFileInputPartitionTest.cs b/lang/cs/Org.Apache.REEF.IO.TestClient/HadoopFileInputPartitionTest.cs
index a1e5f84..1633325 100644
--- a/lang/cs/Org.Apache.REEF.IO.TestClient/HadoopFileInputPartitionTest.cs
+++ b/lang/cs/Org.Apache.REEF.IO.TestClient/HadoopFileInputPartitionTest.cs
@@ -46,7 +46,6 @@ namespace Org.Apache.REEF.IO.TestClient
             var serializerConf = TangFactory.GetTang().NewConfigurationBuilder()
                 .BindImplementation<IFileDeSerializer<IEnumerable<byte>>, ByteSerializer>(GenericType<IFileDeSerializer<IEnumerable<byte>>>.Class,
                     GenericType<ByteSerializer>.Class)
-                .BindNamedParam<CopyToLocal, bool>("true")
                 .Build();
             var serializerConfString = (new AvroConfigurationSerializer()).ToString(serializerConf);
 
@@ -55,6 +54,7 @@ namespace Org.Apache.REEF.IO.TestClient
                     .Set(FileSystemInputPartitionConfiguration<IEnumerable<byte>>.FilePathForPartitions, remoteFilePath1)
                     .Set(FileSystemInputPartitionConfiguration<IEnumerable<byte>>.FilePathForPartitions, remoteFilePath2)
                     .Set(FileSystemInputPartitionConfiguration<IEnumerable<byte>>.FileSerializerConfig, serializerConfString)
+                    .Set(FileSystemInputPartitionConfiguration<IEnumerable<byte>>.CopyToLocal, "true")
                 .Build(),
                   HadoopFileSystemConfiguration.ConfigurationModule.Build())
                 .GetInstance<IPartitionedInputDataSet>();

http://git-wip-us.apache.org/repos/asf/reef/blob/4eadc7f0/lang/cs/Org.Apache.REEF.IO.Tests/TestFilePartitionInputDataSet.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IO.Tests/TestFilePartitionInputDataSet.cs b/lang/cs/Org.Apache.REEF.IO.Tests/TestFilePartitionInputDataSet.cs
index 5d829c3..5cbb983 100644
--- a/lang/cs/Org.Apache.REEF.IO.Tests/TestFilePartitionInputDataSet.cs
+++ b/lang/cs/Org.Apache.REEF.IO.Tests/TestFilePartitionInputDataSet.cs
@@ -52,6 +52,7 @@ namespace Org.Apache.REEF.IO.Tests
             var dataSet = TangFactory.GetTang()
                 .NewInjector(FileSystemInputPartitionConfiguration<IEnumerable<byte>>.ConfigurationModule
                     .Set(FileSystemInputPartitionConfiguration<IEnumerable<byte>>.FilePathForPartitions, filePaths)
+                    .Set(FileSystemInputPartitionConfiguration<IEnumerable<byte>>.CopyToLocal, "true")
                     .Set(FileSystemInputPartitionConfiguration<IEnumerable<byte>>.FileSerializerConfig,
                         GetByteSerializerConfigString())
                     .Build())
@@ -76,6 +77,7 @@ namespace Org.Apache.REEF.IO.Tests
                         sourceFilePath1 + ";" + sourceFilePath2)
                     .Set(FileSystemInputPartitionConfiguration<IEnumerable<byte>>.FileSerializerConfig,
                         GetByteSerializerConfigString())
+                    .Set(FileSystemInputPartitionConfiguration<IEnumerable<byte>>.CopyToLocal, "true")
                     .Build())
                 .GetInstance<IPartitionedInputDataSet>();
 
@@ -195,6 +197,7 @@ namespace Org.Apache.REEF.IO.Tests
                 .NewInjector(FileSystemInputPartitionConfiguration<IEnumerable<Row>>.ConfigurationModule
                     .Set(FileSystemInputPartitionConfiguration<IEnumerable<Row>>.FilePathForPartitions, sourceFilePath1)
                     .Set(FileSystemInputPartitionConfiguration<IEnumerable<Row>>.FilePathForPartitions, sourceFilePath2)
+                    .Set(FileSystemInputPartitionConfiguration<IEnumerable<Row>>.CopyToLocal, "true")
                     .Set(FileSystemInputPartitionConfiguration<IEnumerable<Row>>.FileSerializerConfig, GetRowSerializerConfigString())
                     .Build())
                 .GetInstance<IPartitionedInputDataSet>();
@@ -287,7 +290,6 @@ namespace Org.Apache.REEF.IO.Tests
                 .BindImplementation<IFileDeSerializer<IEnumerable<byte>>, ByteSerializer>(
                     GenericType<IFileDeSerializer<IEnumerable<byte>>>.Class,
                     GenericType<ByteSerializer>.Class)
-                .BindNamedParam<CopyToLocal, bool>("true")
                 .Build();
             return (new AvroConfigurationSerializer()).ToString(serializerConf);
         }
@@ -298,7 +300,6 @@ namespace Org.Apache.REEF.IO.Tests
                 .BindImplementation<IFileDeSerializer<IEnumerable<Row>>, RowSerializer>(
                     GenericType<IFileDeSerializer<IEnumerable<Row>>>.Class,
                     GenericType<RowSerializer>.Class)
-                .BindNamedParam<CopyToLocal, bool>("true")
                 .Build();
             return (new AvroConfigurationSerializer()).ToString(serializerConf);
         }

http://git-wip-us.apache.org/repos/asf/reef/blob/4eadc7f0/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileInputPartitionDescriptor.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileInputPartitionDescriptor.cs b/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileInputPartitionDescriptor.cs
index a842e25..53afbf1 100644
--- a/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileInputPartitionDescriptor.cs
+++ b/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileInputPartitionDescriptor.cs
@@ -31,11 +31,13 @@ namespace Org.Apache.REEF.IO.PartitionedData.FileSystem
         private readonly string _id;
         private readonly IList<string> _filePaths;
         private readonly IConfiguration _filePartitionDeserializerConfig;
+        private readonly bool _copyToLocal;
 
-        internal FileInputPartitionDescriptor(string id, IList<string> filePaths, IConfiguration filePartitionDeserializerConfig)
+        internal FileInputPartitionDescriptor(string id, IList<string> filePaths, bool copyToLocal, IConfiguration filePartitionDeserializerConfig)
         {
             _id = id;
             _filePaths = filePaths;
+            _copyToLocal = copyToLocal;
             _filePartitionDeserializerConfig = filePartitionDeserializerConfig;
         }
 
@@ -52,6 +54,7 @@ namespace Org.Apache.REEF.IO.PartitionedData.FileSystem
         {
             var builder = TangFactory.GetTang().NewConfigurationBuilder()
                 .BindImplementation(GenericType<IInputPartition<T>>.Class, GenericType<FileSystemInputPartition<T>>.Class)
+                .BindNamedParameter<CopyToLocal, bool>(GenericType<CopyToLocal>.Class, _copyToLocal.ToString())
                 .BindStringNamedParam<PartitionId>(_id);
 
             foreach (string p in _filePaths)

http://git-wip-us.apache.org/repos/asf/reef/blob/4eadc7f0/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileSystemInputPartitionConfiguration.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileSystemInputPartitionConfiguration.cs b/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileSystemInputPartitionConfiguration.cs
index cddbe41..ec40902 100644
--- a/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileSystemInputPartitionConfiguration.cs
+++ b/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileSystemInputPartitionConfiguration.cs
@@ -39,6 +39,11 @@ namespace Org.Apache.REEF.IO.PartitionedData.FileSystem
         public static readonly RequiredParameter<string> FileSerializerConfig = new RequiredParameter<string>();
 
         /// <summary>
+        /// This specify if the file needs to be copied to local in FilePathsForInputPartitions 
+        /// </summary>
+        public static readonly OptionalParameter<bool> CopyToLocal = new OptionalParameter<bool>();
+
+        /// <summary>
         /// This configuration module set FileSystemDataSet as IPartitionedDataSet.
         /// It also set required parameters for injecting FileSystemDataSet
         /// </summary>
@@ -46,6 +51,7 @@ namespace Org.Apache.REEF.IO.PartitionedData.FileSystem
             .BindImplementation(GenericType<IPartitionedInputDataSet>.Class, GenericType<FileSystemPartitionInputDataSet<T>>.Class)
             .BindSetEntry(GenericType<FilePathsForInputPartitions>.Class, FilePathForPartitions)
             .BindNamedParameter(GenericType<FileDeSerializerConfigString>.Class, FileSerializerConfig)
+            .BindNamedParameter(GenericType<CopyToLocal>.Class, CopyToLocal)
             .Build();
     }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/4eadc7f0/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileSystemPartitionInputDataSet.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileSystemPartitionInputDataSet.cs b/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileSystemPartitionInputDataSet.cs
index 4c68ce2..a720b6e 100644
--- a/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileSystemPartitionInputDataSet.cs
+++ b/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileSystemPartitionInputDataSet.cs
@@ -51,6 +51,7 @@ namespace Org.Apache.REEF.IO.PartitionedData.FileSystem
         private FileSystemPartitionInputDataSet(
             [Parameter(typeof(FilePathsForInputPartitions))] ISet<string> filePaths,
             IFileSystem fileSystem,
+            [Parameter(typeof(CopyToLocal))] bool copyToLocal,
             [Parameter(typeof(FileDeSerializerConfigString))] string fileSerializerConfigString,
             AvroConfigurationSerializer avroConfigurationSerializer)
         {
@@ -68,7 +69,7 @@ namespace Org.Apache.REEF.IO.PartitionedData.FileSystem
                 var paths = path.Split(new string[] { StringSeparators }, StringSplitOptions.None);
                
                 var id = "FilePartition-" + i++;
-                _partitions[id] = new FileInputPartitionDescriptor<T>(id, paths.ToList(), fileSerializerConfig); 
+                _partitions[id] = new FileInputPartitionDescriptor<T>(id, paths.ToList(), copyToLocal, fileSerializerConfig); 
             }
         }