You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2017/06/15 05:19:11 UTC
systemml git commit: [SYSTEMML-1697] Fix cp multi-part binary read
from object stores
Repository: systemml
Updated Branches:
refs/heads/master 7602af94f -> 717831daf
[SYSTEMML-1697] Fix cp multi-part binary read from object stores
Our CP binary readers directly read from sequence files instead of
record readers. When reading from object stores this creates problems
for multi-part files because the parent directory does not physically
exist. Furthermore, the listing of part files also contains the mtd file
because it shares the common directory prefix. This patch makes the
binary readers more robust for such file systems that are not compliant
with the HDFS file system API.
Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/717831da
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/717831da
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/717831da
Branch: refs/heads/master
Commit: 717831daff16d4c87381e6b8d4f48baa9c2922b4
Parents: 7602af9
Author: Matthias Boehm <mb...@gmail.com>
Authored: Wed Jun 14 20:18:44 2017 -0700
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Wed Jun 14 20:22:44 2017 -0700
----------------------------------------------------------------------
.../parfor/DataPartitionerLocal.java | 7 ++--
.../controlprogram/parfor/RemoteDPParForMR.java | 3 +-
.../controlprogram/parfor/RemoteParForMR.java | 3 +-
.../parfor/ResultMergeLocalFile.java | 7 ++--
.../ParameterizedBuiltinCPFileInstruction.java | 5 +--
.../apache/sysml/runtime/io/FrameReader.java | 24 ------------
.../runtime/io/FrameReaderBinaryBlock.java | 4 +-
.../io/FrameReaderBinaryBlockParallel.java | 2 +-
.../sysml/runtime/io/IOUtilFunctions.java | 39 ++++++++++++++++++++
.../apache/sysml/runtime/io/MatrixReader.java | 24 ------------
.../sysml/runtime/io/ReaderBinaryBlock.java | 4 +-
.../runtime/io/ReaderBinaryBlockParallel.java | 2 +-
.../sysml/runtime/io/ReaderBinaryCell.java | 2 +-
.../sysml/runtime/util/MapReduceTool.java | 4 +-
14 files changed, 59 insertions(+), 71 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/systemml/blob/717831da/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerLocal.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerLocal.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerLocal.java
index f0b5674..b57f050 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerLocal.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerLocal.java
@@ -48,7 +48,6 @@ import org.apache.sysml.runtime.controlprogram.parfor.util.Cell;
import org.apache.sysml.runtime.controlprogram.parfor.util.IDSequence;
import org.apache.sysml.runtime.controlprogram.parfor.util.StagingFileUtils;
import org.apache.sysml.runtime.io.IOUtilFunctions;
-import org.apache.sysml.runtime.io.MatrixReader;
import org.apache.sysml.runtime.matrix.data.IJV;
import org.apache.sysml.runtime.matrix.data.InputInfo;
import org.apache.sysml.runtime.matrix.data.MatrixBlock;
@@ -246,7 +245,7 @@ public class DataPartitionerLocal extends DataPartitioner
MatrixIndexes key = new MatrixIndexes();
MatrixCell value = new MatrixCell();
- for( Path lpath : MatrixReader.getSequenceFilePaths(fs, path) )
+ for( Path lpath : IOUtilFunctions.getSequenceFilePaths(fs, path) )
{
SequenceFile.Reader reader = new SequenceFile.Reader(fs,lpath,job);
try
@@ -333,7 +332,7 @@ public class DataPartitionerLocal extends DataPartitioner
MatrixIndexes key = new MatrixIndexes();
MatrixBlock value = new MatrixBlock();
- for(Path lpath : MatrixReader.getSequenceFilePaths(fs, path) )
+ for(Path lpath : IOUtilFunctions.getSequenceFilePaths(fs, path) )
{
SequenceFile.Reader reader = new SequenceFile.Reader(fs,lpath,job);
try
@@ -408,7 +407,7 @@ public class DataPartitionerLocal extends DataPartitioner
LinkedList<Cell> buffer = new LinkedList<Cell>();
- for(Path lpath : MatrixReader.getSequenceFilePaths(fs, path) )
+ for(Path lpath : IOUtilFunctions.getSequenceFilePaths(fs, path) )
{
SequenceFile.Reader reader = new SequenceFile.Reader(fs,lpath,job);
try
http://git-wip-us.apache.org/repos/asf/systemml/blob/717831da/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java
index 60d23c6..2bccdba 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java
@@ -52,7 +52,6 @@ import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableBlock;
import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableCell;
import org.apache.sysml.runtime.instructions.cp.Data;
import org.apache.sysml.runtime.io.IOUtilFunctions;
-import org.apache.sysml.runtime.io.MatrixReader;
import org.apache.sysml.runtime.matrix.data.InputInfo;
import org.apache.sysml.runtime.matrix.data.OutputInfo;
import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames;
@@ -253,7 +252,7 @@ public class RemoteDPParForMR
Text value = new Text(); //serialized var header (incl filename)
int countAll = 0;
- for( Path lpath : MatrixReader.getSequenceFilePaths(fs, path) )
+ for( Path lpath : IOUtilFunctions.getSequenceFilePaths(fs, path) )
{
SequenceFile.Reader reader = new SequenceFile.Reader(fs,lpath,job);
try
http://git-wip-us.apache.org/repos/asf/systemml/blob/717831da/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java
index 67c3368..1a3b8c3 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java
@@ -50,7 +50,6 @@ import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyze
import org.apache.sysml.runtime.controlprogram.parfor.stat.Stat;
import org.apache.sysml.runtime.instructions.cp.Data;
import org.apache.sysml.runtime.io.IOUtilFunctions;
-import org.apache.sysml.runtime.io.MatrixReader;
import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames;
import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
@@ -264,7 +263,7 @@ public class RemoteParForMR
Text value = new Text(); //serialized var header (incl filename)
int countAll = 0;
- for( Path lpath : MatrixReader.getSequenceFilePaths(fs, path) )
+ for( Path lpath : IOUtilFunctions.getSequenceFilePaths(fs, path) )
{
SequenceFile.Reader reader = new SequenceFile.Reader(fs,lpath,job);
try
http://git-wip-us.apache.org/repos/asf/systemml/blob/717831da/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java
index 7dae670..9b274be 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java
@@ -50,7 +50,6 @@ import org.apache.sysml.runtime.controlprogram.parfor.util.Cell;
import org.apache.sysml.runtime.controlprogram.parfor.util.IDSequence;
import org.apache.sysml.runtime.controlprogram.parfor.util.StagingFileUtils;
import org.apache.sysml.runtime.io.IOUtilFunctions;
-import org.apache.sysml.runtime.io.MatrixReader;
import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
import org.apache.sysml.runtime.matrix.MatrixFormatMetaData;
import org.apache.sysml.runtime.matrix.data.IJV;
@@ -340,7 +339,7 @@ public class ResultMergeLocalFile extends ResultMerge
JobConf tmpJob = new JobConf(ConfigurationManager.getCachedJobConf());
Path tmpPath = new Path(in.getFileName());
- for(Path lpath : MatrixReader.getSequenceFilePaths(fs, tmpPath) )
+ for(Path lpath : IOUtilFunctions.getSequenceFilePaths(fs, tmpPath) )
{
SequenceFile.Reader reader = new SequenceFile.Reader(fs,lpath,tmpJob);
try
@@ -477,7 +476,7 @@ public class ResultMergeLocalFile extends ResultMerge
Path tmpPath = new Path(mo.getFileName());
FileSystem fs = IOUtilFunctions.getFileSystem(tmpPath, tmpJob);
- for(Path lpath : MatrixReader.getSequenceFilePaths(fs, tmpPath))
+ for(Path lpath : IOUtilFunctions.getSequenceFilePaths(fs, tmpPath))
{
SequenceFile.Reader reader = new SequenceFile.Reader(fs,lpath,tmpJob);
try
@@ -576,7 +575,7 @@ public class ResultMergeLocalFile extends ResultMerge
int brlen = mc.getRowsPerBlock();
int bclen = mc.getColsPerBlock();
- for(Path lpath: MatrixReader.getSequenceFilePaths(fs, path))
+ for(Path lpath: IOUtilFunctions.getSequenceFilePaths(fs, path))
{
SequenceFile.Reader reader = new SequenceFile.Reader(fs,lpath,job);
try
http://git-wip-us.apache.org/repos/asf/systemml/blob/717831da/src/main/java/org/apache/sysml/runtime/instructions/cpfile/ParameterizedBuiltinCPFileInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/cpfile/ParameterizedBuiltinCPFileInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/cpfile/ParameterizedBuiltinCPFileInstruction.java
index 6a7127f..3046c7e 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/cpfile/ParameterizedBuiltinCPFileInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/cpfile/ParameterizedBuiltinCPFileInstruction.java
@@ -57,7 +57,6 @@ import org.apache.sysml.runtime.instructions.InstructionUtils;
import org.apache.sysml.runtime.instructions.cp.CPOperand;
import org.apache.sysml.runtime.instructions.cp.ParameterizedBuiltinCPInstruction;
import org.apache.sysml.runtime.io.IOUtilFunctions;
-import org.apache.sysml.runtime.io.MatrixReader;
import org.apache.sysml.runtime.io.MatrixWriter;
import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
import org.apache.sysml.runtime.matrix.MatrixFormatMetaData;
@@ -330,7 +329,7 @@ public class ParameterizedBuiltinCPFileInstruction extends ParameterizedBuiltinC
MatrixIndexes key = new MatrixIndexes();
MatrixCell value = new MatrixCell();
- for(Path lpath: MatrixReader.getSequenceFilePaths(fs, path))
+ for(Path lpath: IOUtilFunctions.getSequenceFilePaths(fs, path))
{
SequenceFile.Reader reader = new SequenceFile.Reader(fs,lpath,job);
try
@@ -387,7 +386,7 @@ public class ParameterizedBuiltinCPFileInstruction extends ParameterizedBuiltinC
MatrixBlock value = new MatrixBlock();
boolean diagBlocks = true;
- for(Path lpath : MatrixReader.getSequenceFilePaths(fs, path))
+ for(Path lpath : IOUtilFunctions.getSequenceFilePaths(fs, path))
{
SequenceFile.Reader reader = new SequenceFile.Reader(fs,lpath,job);
http://git-wip-us.apache.org/repos/asf/systemml/blob/717831da/src/main/java/org/apache/sysml/runtime/io/FrameReader.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReader.java b/src/main/java/org/apache/sysml/runtime/io/FrameReader.java
index 03271d8..584ff4d 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameReader.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameReader.java
@@ -22,9 +22,7 @@ package org.apache.sysml.runtime.io;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
-import java.util.LinkedList;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.sysml.hops.OptimizerUtils;
@@ -86,28 +84,6 @@ public abstract class FrameReader
return (clen < 0) ? new String[0] :
FrameBlock.createColNames((int)clen);
}
-
- public static Path[] getSequenceFilePaths( FileSystem fs, Path file )
- throws IOException
- {
- Path[] ret = null;
-
- if( fs.isDirectory(file) )
- {
- LinkedList<Path> tmp = new LinkedList<Path>();
- FileStatus[] dStatus = fs.listStatus(file);
- for( FileStatus fdStatus : dStatus )
- if( !fdStatus.getPath().getName().startsWith("_") ) //skip internal files
- tmp.add(fdStatus.getPath());
- ret = tmp.toArray(new Path[0]);
- }
- else
- {
- ret = new Path[]{ file };
- }
-
- return ret;
- }
/**
* NOTE: mallocDense controls if the output matrix blocks is fully allocated, this can be redundant
http://git-wip-us.apache.org/repos/asf/systemml/blob/717831da/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java b/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java
index e08aa96..7bd2c00 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java
@@ -73,7 +73,7 @@ public class FrameReaderBinaryBlock extends FrameReader
throws IOException, DMLRuntimeException
{
//sequential read from sequence files
- for( Path lpath : getSequenceFilePaths(fs, path) ) //1..N files
+ for( Path lpath : IOUtilFunctions.getSequenceFilePaths(fs, path) ) //1..N files
readBinaryBlockFrameFromSequenceFile(lpath, job, fs, dest);
}
@@ -135,7 +135,7 @@ public class FrameReaderBinaryBlock extends FrameReader
FrameBlock value = new FrameBlock();
//read first block from first file
- Path lpath = getSequenceFilePaths(fs, path)[0];
+ Path lpath = IOUtilFunctions.getSequenceFilePaths(fs, path)[0];
SequenceFile.Reader reader = new SequenceFile.Reader(fs,lpath,job);
try {
reader.next(key, value);
http://git-wip-us.apache.org/repos/asf/systemml/blob/717831da/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlockParallel.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlockParallel.java b/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlockParallel.java
index 7b2e790..45718f9 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlockParallel.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlockParallel.java
@@ -52,7 +52,7 @@ public class FrameReaderBinaryBlockParallel extends FrameReaderBinaryBlock
//create read tasks for all files
ExecutorService pool = Executors.newFixedThreadPool(numThreads);
ArrayList<ReadFileTask> tasks = new ArrayList<ReadFileTask>();
- for( Path lpath : getSequenceFilePaths(fs, path) )
+ for( Path lpath : IOUtilFunctions.getSequenceFilePaths(fs, path) )
tasks.add(new ReadFileTask(lpath, job, fs, dest));
//wait until all tasks have been executed
http://git-wip-us.apache.org/repos/asf/systemml/blob/717831da/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java b/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java
index 44d9aee..12b7438 100644
--- a/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java
+++ b/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java
@@ -27,12 +27,14 @@ import java.io.StringReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
+import java.util.LinkedList;
import org.apache.commons.io.input.ReaderInputStream;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
@@ -78,6 +80,14 @@ public class IOUtilFunctions
|| scheme1.equals(scheme2);
}
+ public static boolean isObjectStoreFileScheme(Path path) {
+ if( path == null || path.toUri() == null || path.toUri().getScheme() == null )
+ return false;
+ String scheme = path.toUri().getScheme();
+ //capture multiple alternatives s3, s3n, s3a, swift, swift2d
+ return scheme.startsWith("s3") || scheme.startsWith("swift");
+ }
+
public static void closeSilently( Closeable io ) {
try {
if( io != null )
@@ -436,6 +446,35 @@ public class IOUtilFunctions
return ncol;
}
+ public static Path[] getSequenceFilePaths( FileSystem fs, Path file )
+ throws IOException
+ {
+ Path[] ret = null;
+
+ //Note on object stores: Since the object store file system implementations
+ //only emulate a file system, the directory of a multi-part file does not
+ //exist physically and hence the isDirectory call returns false. Furthermore,
+ //listStatus call returns all files with the given directory as prefix, which
+ //includes the mtd file which needs to be ignored accordingly.
+
+ if( fs.isDirectory(file)
+ || IOUtilFunctions.isObjectStoreFileScheme(file) )
+ {
+ LinkedList<Path> tmp = new LinkedList<Path>();
+ FileStatus[] dStatus = fs.listStatus(file);
+ for( FileStatus fdStatus : dStatus )
+ if( !fdStatus.getPath().getName().startsWith("_") //skip internal files
+ && !fdStatus.getPath().equals(file.toString()+".mtd") ) //mtd file
+ tmp.add(fdStatus.getPath());
+ ret = tmp.toArray(new Path[0]);
+ }
+ else {
+ ret = new Path[]{ file };
+ }
+
+ return ret;
+ }
+
/**
* Delete the CRC files from the local file system associated with a
* particular file and its metadata file.
http://git-wip-us.apache.org/repos/asf/systemml/blob/717831da/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java b/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java
index dbd1602..befccfe 100644
--- a/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java
+++ b/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java
@@ -23,14 +23,12 @@ import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
-import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.sysml.hops.OptimizerUtils;
@@ -58,28 +56,6 @@ public abstract class MatrixReader
public abstract MatrixBlock readMatrixFromInputStream( InputStream is, long rlen, long clen, int brlen, int bclen, long estnnz )
throws IOException, DMLRuntimeException;
- public static Path[] getSequenceFilePaths( FileSystem fs, Path file )
- throws IOException
- {
- Path[] ret = null;
-
- if( fs.isDirectory(file) )
- {
- LinkedList<Path> tmp = new LinkedList<Path>();
- FileStatus[] dStatus = fs.listStatus(file);
- for( FileStatus fdStatus : dStatus )
- if( !fdStatus.getPath().getName().startsWith("_") ) //skip internal files
- tmp.add(fdStatus.getPath());
- ret = tmp.toArray(new Path[0]);
- }
- else
- {
- ret = new Path[]{ file };
- }
-
- return ret;
- }
-
/**
* NOTE: mallocDense controls if the output matrix blocks is fully allocated, this can be redundant
* if binary block read and single block.
http://git-wip-us.apache.org/repos/asf/systemml/blob/717831da/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java
index 16241c9..0bca17d 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java
@@ -138,7 +138,7 @@ public class ReaderBinaryBlock extends MatrixReader
if( MRJobConfiguration.USE_BINARYBLOCK_SERIALIZATION )
MRJobConfiguration.addBinaryBlockSerializationFramework( job );
- for( Path lpath : getSequenceFilePaths(fs, path) ) //1..N files
+ for( Path lpath : IOUtilFunctions.getSequenceFilePaths(fs, path) ) //1..N files
{
//directly read from sequence files (individual partfiles)
SequenceFile.Reader reader = new SequenceFile.Reader(fs,lpath,job);
@@ -207,7 +207,7 @@ public class ReaderBinaryBlock extends MatrixReader
if( MRJobConfiguration.USE_BINARYBLOCK_SERIALIZATION )
MRJobConfiguration.addBinaryBlockSerializationFramework( job );
- for( Path lpath : getSequenceFilePaths(fs, path) ) //1..N files
+ for( Path lpath : IOUtilFunctions.getSequenceFilePaths(fs, path) ) //1..N files
{
//directly read from sequence files (individual partfiles)
SequenceFile.Reader reader = new SequenceFile.Reader(fs,lpath,job);
http://git-wip-us.apache.org/repos/asf/systemml/blob/717831da/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java
index 2afdb0e..e7114d8 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java
@@ -89,7 +89,7 @@ public class ReaderBinaryBlockParallel extends ReaderBinaryBlock
//create read tasks for all files
ExecutorService pool = Executors.newFixedThreadPool(_numThreads);
ArrayList<ReadFileTask> tasks = new ArrayList<ReadFileTask>();
- for( Path lpath : getSequenceFilePaths(fs, path) ){
+ for( Path lpath : IOUtilFunctions.getSequenceFilePaths(fs, path) ){
ReadFileTask t = new ReadFileTask(lpath, job, fs, dest, rlen, clen, brlen, bclen);
tasks.add(t);
}
http://git-wip-us.apache.org/repos/asf/systemml/blob/717831da/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryCell.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryCell.java b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryCell.java
index 2bdc25e..00a1301 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryCell.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryCell.java
@@ -79,7 +79,7 @@ public class ReaderBinaryCell extends MatrixReader
try
{
- for( Path lpath : getSequenceFilePaths(fs,path) ) //1..N files
+ for( Path lpath : IOUtilFunctions.getSequenceFilePaths(fs,path) ) //1..N files
{
//directly read from sequence files (individual partfiles)
SequenceFile.Reader reader = new SequenceFile.Reader(fs,lpath,job);
http://git-wip-us.apache.org/repos/asf/systemml/blob/717831da/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java b/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java
index 5bdccfd..be6e177 100644
--- a/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java
+++ b/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java
@@ -186,7 +186,9 @@ public class MapReduceTool
public static boolean isFileEmpty(FileSystem fs, Path dir) throws IOException {
FileStatus fstat = fs.getFileStatus(dir);
- if (fstat.isDirectory()) {
+ if( fstat.isDirectory()
+ || IOUtilFunctions.isObjectStoreFileScheme(dir) )
+ {
// it is a directory
FileStatus[] stats = fs.listStatus(dir);
if (stats != null) {