You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2017/06/15 05:19:11 UTC

systemml git commit: [SYSTEMML-1697] Fix cp multi-part binary read from object stores

Repository: systemml
Updated Branches:
  refs/heads/master 7602af94f -> 717831daf


[SYSTEMML-1697] Fix cp multi-part binary read from object stores

Our CP binary readers directly read from sequence files instead of
record readers. When reading from object stores this creates problems
for multi-part files because the parent directory does not physically
exist. Furthermore, the listing of part files also contains the mtd file
because it shares the common directory prefix. This patch makes the
binary readers more robust for such file systems that are not compliant
with the HDFS file system API.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/717831da
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/717831da
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/717831da

Branch: refs/heads/master
Commit: 717831daff16d4c87381e6b8d4f48baa9c2922b4
Parents: 7602af9
Author: Matthias Boehm <mb...@gmail.com>
Authored: Wed Jun 14 20:18:44 2017 -0700
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Wed Jun 14 20:22:44 2017 -0700

----------------------------------------------------------------------
 .../parfor/DataPartitionerLocal.java            |  7 ++--
 .../controlprogram/parfor/RemoteDPParForMR.java |  3 +-
 .../controlprogram/parfor/RemoteParForMR.java   |  3 +-
 .../parfor/ResultMergeLocalFile.java            |  7 ++--
 .../ParameterizedBuiltinCPFileInstruction.java  |  5 +--
 .../apache/sysml/runtime/io/FrameReader.java    | 24 ------------
 .../runtime/io/FrameReaderBinaryBlock.java      |  4 +-
 .../io/FrameReaderBinaryBlockParallel.java      |  2 +-
 .../sysml/runtime/io/IOUtilFunctions.java       | 39 ++++++++++++++++++++
 .../apache/sysml/runtime/io/MatrixReader.java   | 24 ------------
 .../sysml/runtime/io/ReaderBinaryBlock.java     |  4 +-
 .../runtime/io/ReaderBinaryBlockParallel.java   |  2 +-
 .../sysml/runtime/io/ReaderBinaryCell.java      |  2 +-
 .../sysml/runtime/util/MapReduceTool.java       |  4 +-
 14 files changed, 59 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/717831da/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerLocal.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerLocal.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerLocal.java
index f0b5674..b57f050 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerLocal.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerLocal.java
@@ -48,7 +48,6 @@ import org.apache.sysml.runtime.controlprogram.parfor.util.Cell;
 import org.apache.sysml.runtime.controlprogram.parfor.util.IDSequence;
 import org.apache.sysml.runtime.controlprogram.parfor.util.StagingFileUtils;
 import org.apache.sysml.runtime.io.IOUtilFunctions;
-import org.apache.sysml.runtime.io.MatrixReader;
 import org.apache.sysml.runtime.matrix.data.IJV;
 import org.apache.sysml.runtime.matrix.data.InputInfo;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
@@ -246,7 +245,7 @@ public class DataPartitionerLocal extends DataPartitioner
 			MatrixIndexes key = new MatrixIndexes();
 			MatrixCell value = new MatrixCell();
 	
-			for( Path lpath : MatrixReader.getSequenceFilePaths(fs, path) )
+			for( Path lpath : IOUtilFunctions.getSequenceFilePaths(fs, path) )
 			{
 				SequenceFile.Reader reader = new SequenceFile.Reader(fs,lpath,job);
 				try
@@ -333,7 +332,7 @@ public class DataPartitionerLocal extends DataPartitioner
 			MatrixIndexes key = new MatrixIndexes(); 
 			MatrixBlock value = new MatrixBlock();
 			
-			for(Path lpath : MatrixReader.getSequenceFilePaths(fs, path) )
+			for(Path lpath : IOUtilFunctions.getSequenceFilePaths(fs, path) )
 			{
 				SequenceFile.Reader reader = new SequenceFile.Reader(fs,lpath,job);
 				try
@@ -408,7 +407,7 @@ public class DataPartitionerLocal extends DataPartitioner
 			
 			LinkedList<Cell> buffer = new LinkedList<Cell>();
 			
-			for(Path lpath : MatrixReader.getSequenceFilePaths(fs, path) )
+			for(Path lpath : IOUtilFunctions.getSequenceFilePaths(fs, path) )
 			{
 				SequenceFile.Reader reader = new SequenceFile.Reader(fs,lpath,job);
 				try

http://git-wip-us.apache.org/repos/asf/systemml/blob/717831da/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java
index 60d23c6..2bccdba 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java
@@ -52,7 +52,6 @@ import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableBlock;
 import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableCell;
 import org.apache.sysml.runtime.instructions.cp.Data;
 import org.apache.sysml.runtime.io.IOUtilFunctions;
-import org.apache.sysml.runtime.io.MatrixReader;
 import org.apache.sysml.runtime.matrix.data.InputInfo;
 import org.apache.sysml.runtime.matrix.data.OutputInfo;
 import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames;
@@ -253,7 +252,7 @@ public class RemoteDPParForMR
 		Text value = new Text();               //serialized var header (incl filename)
 		
 		int countAll = 0;
-		for( Path lpath : MatrixReader.getSequenceFilePaths(fs, path) )
+		for( Path lpath : IOUtilFunctions.getSequenceFilePaths(fs, path) )
 		{
 			SequenceFile.Reader reader = new SequenceFile.Reader(fs,lpath,job);
 			try

http://git-wip-us.apache.org/repos/asf/systemml/blob/717831da/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java
index 67c3368..1a3b8c3 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java
@@ -50,7 +50,6 @@ import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyze
 import org.apache.sysml.runtime.controlprogram.parfor.stat.Stat;
 import org.apache.sysml.runtime.instructions.cp.Data;
 import org.apache.sysml.runtime.io.IOUtilFunctions;
-import org.apache.sysml.runtime.io.MatrixReader;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames;
 import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
@@ -264,7 +263,7 @@ public class RemoteParForMR
 		Text value = new Text();               //serialized var header (incl filename)
 		
 		int countAll = 0;
-		for( Path lpath : MatrixReader.getSequenceFilePaths(fs, path) )
+		for( Path lpath : IOUtilFunctions.getSequenceFilePaths(fs, path) )
 		{
 			SequenceFile.Reader reader = new SequenceFile.Reader(fs,lpath,job);
 			try

http://git-wip-us.apache.org/repos/asf/systemml/blob/717831da/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java
index 7dae670..9b274be 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java
@@ -50,7 +50,6 @@ import org.apache.sysml.runtime.controlprogram.parfor.util.Cell;
 import org.apache.sysml.runtime.controlprogram.parfor.util.IDSequence;
 import org.apache.sysml.runtime.controlprogram.parfor.util.StagingFileUtils;
 import org.apache.sysml.runtime.io.IOUtilFunctions;
-import org.apache.sysml.runtime.io.MatrixReader;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.MatrixFormatMetaData;
 import org.apache.sysml.runtime.matrix.data.IJV;
@@ -340,7 +339,7 @@ public class ResultMergeLocalFile extends ResultMerge
 					JobConf tmpJob = new JobConf(ConfigurationManager.getCachedJobConf());
 					Path tmpPath = new Path(in.getFileName());
 					
-					for(Path lpath : MatrixReader.getSequenceFilePaths(fs, tmpPath) )
+					for(Path lpath : IOUtilFunctions.getSequenceFilePaths(fs, tmpPath) )
 					{
 						SequenceFile.Reader reader = new SequenceFile.Reader(fs,lpath,tmpJob);
 						try
@@ -477,7 +476,7 @@ public class ResultMergeLocalFile extends ResultMerge
 		Path tmpPath = new Path(mo.getFileName());
 		FileSystem fs = IOUtilFunctions.getFileSystem(tmpPath, tmpJob);
 		
-		for(Path lpath : MatrixReader.getSequenceFilePaths(fs, tmpPath))
+		for(Path lpath : IOUtilFunctions.getSequenceFilePaths(fs, tmpPath))
 		{
 			SequenceFile.Reader reader = new SequenceFile.Reader(fs,lpath,tmpJob);
 			try
@@ -576,7 +575,7 @@ public class ResultMergeLocalFile extends ResultMerge
 		int brlen = mc.getRowsPerBlock();
 		int bclen = mc.getColsPerBlock();
 		
-		for(Path lpath: MatrixReader.getSequenceFilePaths(fs, path))
+		for(Path lpath: IOUtilFunctions.getSequenceFilePaths(fs, path))
 		{
 			SequenceFile.Reader reader = new SequenceFile.Reader(fs,lpath,job);
 			try

http://git-wip-us.apache.org/repos/asf/systemml/blob/717831da/src/main/java/org/apache/sysml/runtime/instructions/cpfile/ParameterizedBuiltinCPFileInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/cpfile/ParameterizedBuiltinCPFileInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/cpfile/ParameterizedBuiltinCPFileInstruction.java
index 6a7127f..3046c7e 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/cpfile/ParameterizedBuiltinCPFileInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/cpfile/ParameterizedBuiltinCPFileInstruction.java
@@ -57,7 +57,6 @@ import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
 import org.apache.sysml.runtime.instructions.cp.ParameterizedBuiltinCPInstruction;
 import org.apache.sysml.runtime.io.IOUtilFunctions;
-import org.apache.sysml.runtime.io.MatrixReader;
 import org.apache.sysml.runtime.io.MatrixWriter;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.MatrixFormatMetaData;
@@ -330,7 +329,7 @@ public class ParameterizedBuiltinCPFileInstruction extends ParameterizedBuiltinC
 			MatrixIndexes key = new MatrixIndexes();
 			MatrixCell value = new MatrixCell();
 
-			for(Path lpath: MatrixReader.getSequenceFilePaths(fs, path))
+			for(Path lpath: IOUtilFunctions.getSequenceFilePaths(fs, path))
 			{
 				SequenceFile.Reader reader = new SequenceFile.Reader(fs,lpath,job);
 				try
@@ -387,7 +386,7 @@ public class ParameterizedBuiltinCPFileInstruction extends ParameterizedBuiltinC
 			MatrixBlock value = new MatrixBlock();
 			boolean diagBlocks = true;
 			
-			for(Path lpath : MatrixReader.getSequenceFilePaths(fs, path))
+			for(Path lpath : IOUtilFunctions.getSequenceFilePaths(fs, path))
 			{
 				SequenceFile.Reader reader = new SequenceFile.Reader(fs,lpath,job);
 				

http://git-wip-us.apache.org/repos/asf/systemml/blob/717831da/src/main/java/org/apache/sysml/runtime/io/FrameReader.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReader.java b/src/main/java/org/apache/sysml/runtime/io/FrameReader.java
index 03271d8..584ff4d 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameReader.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameReader.java
@@ -22,9 +22,7 @@ package org.apache.sysml.runtime.io;
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
-import java.util.LinkedList;
 
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.sysml.hops.OptimizerUtils;
@@ -86,28 +84,6 @@ public abstract class FrameReader
 		return (clen < 0) ? new String[0] : 
 			FrameBlock.createColNames((int)clen);
 	}
-
-	public static Path[] getSequenceFilePaths( FileSystem fs, Path file ) 
-		throws IOException
-	{
-		Path[] ret = null;
-		
-		if( fs.isDirectory(file) )
-		{
-			LinkedList<Path> tmp = new LinkedList<Path>();
-			FileStatus[] dStatus = fs.listStatus(file);
-			for( FileStatus fdStatus : dStatus )
-				if( !fdStatus.getPath().getName().startsWith("_") ) //skip internal files
-					tmp.add(fdStatus.getPath());
-			ret = tmp.toArray(new Path[0]);
-		}
-		else
-		{
-			ret = new Path[]{ file };
-		}
-		
-		return ret;
-	}
 	
 	/**
 	 * NOTE: mallocDense controls if the output matrix blocks is fully allocated, this can be redundant

http://git-wip-us.apache.org/repos/asf/systemml/blob/717831da/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java b/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java
index e08aa96..7bd2c00 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java
@@ -73,7 +73,7 @@ public class FrameReaderBinaryBlock extends FrameReader
 		throws IOException, DMLRuntimeException
 	{
 		//sequential read from sequence files
-		for( Path lpath : getSequenceFilePaths(fs, path) ) //1..N files 
+		for( Path lpath : IOUtilFunctions.getSequenceFilePaths(fs, path) ) //1..N files 
 			readBinaryBlockFrameFromSequenceFile(lpath, job, fs, dest);
 	}
 
@@ -135,7 +135,7 @@ public class FrameReaderBinaryBlock extends FrameReader
 		FrameBlock value = new FrameBlock();
 		
 		//read first block from first file
-		Path lpath = getSequenceFilePaths(fs, path)[0];  
+		Path lpath = IOUtilFunctions.getSequenceFilePaths(fs, path)[0];  
 		SequenceFile.Reader reader = new SequenceFile.Reader(fs,lpath,job);		
 		try {
 			reader.next(key, value);

http://git-wip-us.apache.org/repos/asf/systemml/blob/717831da/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlockParallel.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlockParallel.java b/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlockParallel.java
index 7b2e790..45718f9 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlockParallel.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlockParallel.java
@@ -52,7 +52,7 @@ public class FrameReaderBinaryBlockParallel extends FrameReaderBinaryBlock
 			//create read tasks for all files
 			ExecutorService pool = Executors.newFixedThreadPool(numThreads);
 			ArrayList<ReadFileTask> tasks = new ArrayList<ReadFileTask>();
-			for( Path lpath : getSequenceFilePaths(fs, path) )
+			for( Path lpath : IOUtilFunctions.getSequenceFilePaths(fs, path) )
 				tasks.add(new ReadFileTask(lpath, job, fs, dest));
 
 			//wait until all tasks have been executed

http://git-wip-us.apache.org/repos/asf/systemml/blob/717831da/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java b/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java
index 44d9aee..12b7438 100644
--- a/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java
+++ b/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java
@@ -27,12 +27,14 @@ import java.io.StringReader;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
+import java.util.LinkedList;
 
 import org.apache.commons.io.input.ReaderInputStream;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
@@ -78,6 +80,14 @@ public class IOUtilFunctions
 			|| scheme1.equals(scheme2);
 	}
 	
+	public static boolean isObjectStoreFileScheme(Path path) {
+		if( path == null || path.toUri() == null || path.toUri().getScheme() == null )
+			return false;
+		String scheme = path.toUri().getScheme();
+		//capture multiple alternatives s3, s3n, s3a, swift, swift2d
+		return scheme.startsWith("s3") || scheme.startsWith("swift");
+	}
+	
 	public static void closeSilently( Closeable io ) {
 		try {
 			if( io != null )
@@ -436,6 +446,35 @@ public class IOUtilFunctions
 		return ncol;
 	}
 
+	public static Path[] getSequenceFilePaths( FileSystem fs, Path file ) 
+		throws IOException
+	{
+		Path[] ret = null;
+		
+		//Note on object stores: Since the object store file system implementations 
+		//only emulate a file system, the directory of a multi-part file does not
+		//exist physically and hence the isDirectory call returns false. Furthermore,
+		//listStatus call returns all files with the given directory as prefix, which
+		//includes the mtd file which needs to be ignored accordingly.
+		
+		if( fs.isDirectory(file) 
+			|| IOUtilFunctions.isObjectStoreFileScheme(file) )
+		{
+			LinkedList<Path> tmp = new LinkedList<Path>();
+			FileStatus[] dStatus = fs.listStatus(file);
+			for( FileStatus fdStatus : dStatus )
+				if( !fdStatus.getPath().getName().startsWith("_") //skip internal files
+					&& !fdStatus.getPath().equals(file.toString()+".mtd") )  //mtd file
+					tmp.add(fdStatus.getPath());
+			ret = tmp.toArray(new Path[0]);
+		}
+		else {
+			ret = new Path[]{ file };
+		}
+		
+		return ret;
+	}
+	
 	/**
 	 * Delete the CRC files from the local file system associated with a
 	 * particular file and its metadata file.

http://git-wip-us.apache.org/repos/asf/systemml/blob/717831da/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java b/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java
index dbd1602..befccfe 100644
--- a/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java
+++ b/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java
@@ -23,14 +23,12 @@ import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.sysml.hops.OptimizerUtils;
@@ -58,28 +56,6 @@ public abstract class MatrixReader
 	public abstract MatrixBlock readMatrixFromInputStream( InputStream is, long rlen, long clen, int brlen, int bclen, long estnnz )
 			throws IOException, DMLRuntimeException;
 	
-	public static Path[] getSequenceFilePaths( FileSystem fs, Path file ) 
-		throws IOException
-	{
-		Path[] ret = null;
-		
-		if( fs.isDirectory(file) )
-		{
-			LinkedList<Path> tmp = new LinkedList<Path>();
-			FileStatus[] dStatus = fs.listStatus(file);
-			for( FileStatus fdStatus : dStatus )
-				if( !fdStatus.getPath().getName().startsWith("_") ) //skip internal files
-					tmp.add(fdStatus.getPath());
-			ret = tmp.toArray(new Path[0]);
-		}
-		else
-		{
-			ret = new Path[]{ file };
-		}
-		
-		return ret;
-	}
-	
 	/**
 	 * NOTE: mallocDense controls if the output matrix blocks is fully allocated, this can be redundant
 	 * if binary block read and single block. 

http://git-wip-us.apache.org/repos/asf/systemml/blob/717831da/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java
index 16241c9..0bca17d 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java
@@ -138,7 +138,7 @@ public class ReaderBinaryBlock extends MatrixReader
 		if( MRJobConfiguration.USE_BINARYBLOCK_SERIALIZATION )
 			MRJobConfiguration.addBinaryBlockSerializationFramework( job );
 		
-		for( Path lpath : getSequenceFilePaths(fs, path) ) //1..N files 
+		for( Path lpath : IOUtilFunctions.getSequenceFilePaths(fs, path) ) //1..N files 
 		{
 			//directly read from sequence files (individual partfiles)
 			SequenceFile.Reader reader = new SequenceFile.Reader(fs,lpath,job);
@@ -207,7 +207,7 @@ public class ReaderBinaryBlock extends MatrixReader
 		if( MRJobConfiguration.USE_BINARYBLOCK_SERIALIZATION )
 			MRJobConfiguration.addBinaryBlockSerializationFramework( job );
 		
-		for( Path lpath : getSequenceFilePaths(fs, path) ) //1..N files 
+		for( Path lpath : IOUtilFunctions.getSequenceFilePaths(fs, path) ) //1..N files 
 		{
 			//directly read from sequence files (individual partfiles)
 			SequenceFile.Reader reader = new SequenceFile.Reader(fs,lpath,job);

http://git-wip-us.apache.org/repos/asf/systemml/blob/717831da/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java
index 2afdb0e..e7114d8 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java
@@ -89,7 +89,7 @@ public class ReaderBinaryBlockParallel extends ReaderBinaryBlock
 			//create read tasks for all files
 			ExecutorService pool = Executors.newFixedThreadPool(_numThreads);
 			ArrayList<ReadFileTask> tasks = new ArrayList<ReadFileTask>();
-			for( Path lpath : getSequenceFilePaths(fs, path) ){
+			for( Path lpath : IOUtilFunctions.getSequenceFilePaths(fs, path) ){
 				ReadFileTask t = new ReadFileTask(lpath, job, fs, dest, rlen, clen, brlen, bclen);
 				tasks.add(t);
 			}

http://git-wip-us.apache.org/repos/asf/systemml/blob/717831da/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryCell.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryCell.java b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryCell.java
index 2bdc25e..00a1301 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryCell.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryCell.java
@@ -79,7 +79,7 @@ public class ReaderBinaryCell extends MatrixReader
 		
 		try
 		{
-			for( Path lpath : getSequenceFilePaths(fs,path) ) //1..N files 
+			for( Path lpath : IOUtilFunctions.getSequenceFilePaths(fs,path) ) //1..N files 
 			{
 				//directly read from sequence files (individual partfiles)
 				SequenceFile.Reader reader = new SequenceFile.Reader(fs,lpath,job);

http://git-wip-us.apache.org/repos/asf/systemml/blob/717831da/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java b/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java
index 5bdccfd..be6e177 100644
--- a/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java
+++ b/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java
@@ -186,7 +186,9 @@ public class MapReduceTool
 	public static boolean isFileEmpty(FileSystem fs, Path dir) throws IOException {
 		FileStatus fstat = fs.getFileStatus(dir);
 
-		if (fstat.isDirectory()) {
+		if( fstat.isDirectory() 
+			|| IOUtilFunctions.isObjectStoreFileScheme(dir) )
+		{
 			// it is a directory
 			FileStatus[] stats = fs.listStatus(dir);
 			if (stats != null) {