You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2017/04/05 07:14:48 UTC

incubator-systemml git commit: [SYSTEMML-1464] Fix missing matrix/frame csv read from input streams

Repository: incubator-systemml
Updated Branches:
  refs/heads/master 1d1a9fa40 -> 23709ec60


[SYSTEMML-1464] Fix missing matrix/frame csv read from input streams

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/23709ec6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/23709ec6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/23709ec6

Branch: refs/heads/master
Commit: 23709ec6088af53163147eab72b2b9c06a3a637c
Parents: 1d1a9fa
Author: Matthias Boehm <mb...@gmail.com>
Authored: Wed Apr 5 00:14:10 2017 -0700
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Wed Apr 5 00:14:10 2017 -0700

----------------------------------------------------------------------
 .../org/apache/sysml/api/jmlc/Connection.java   | 106 +++++++----
 .../apache/sysml/runtime/io/FrameReader.java    |  17 +-
 .../runtime/io/FrameReaderBinaryBlock.java      |   9 +-
 .../sysml/runtime/io/FrameReaderTextCSV.java    |  23 ++-
 .../sysml/runtime/io/FrameReaderTextCell.java   |   7 +-
 .../apache/sysml/runtime/io/MatrixReader.java   |   4 +
 .../sysml/runtime/io/ReaderBinaryBlock.java     |   8 +
 .../sysml/runtime/io/ReaderBinaryCell.java      |   9 +-
 .../apache/sysml/runtime/io/ReaderTextCSV.java  | 189 +++++++++++--------
 .../sysml/runtime/io/ReaderTextCSVParallel.java |   8 +
 .../apache/sysml/runtime/io/ReaderTextCell.java |   1 +
 .../runtime/io/ReaderTextCellParallel.java      |   8 +
 .../runtime/util/InputStreamInputFormat.java    |  96 ++++++++++
 .../functions/jmlc/JMLCInputStreamReadTest.java | 183 ++++++++++++++++++
 .../functions/jmlc/ZPackageSuite.java           |   1 +
 15 files changed, 540 insertions(+), 129 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/23709ec6/src/main/java/org/apache/sysml/api/jmlc/Connection.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/jmlc/Connection.java b/src/main/java/org/apache/sysml/api/jmlc/Connection.java
index a3d7ae7..5240dc4 100644
--- a/src/main/java/org/apache/sysml/api/jmlc/Connection.java
+++ b/src/main/java/org/apache/sysml/api/jmlc/Connection.java
@@ -50,11 +50,9 @@ import org.apache.sysml.runtime.controlprogram.Program;
 import org.apache.sysml.runtime.controlprogram.caching.CacheableData;
 import org.apache.sysml.runtime.io.FrameReader;
 import org.apache.sysml.runtime.io.FrameReaderFactory;
-import org.apache.sysml.runtime.io.FrameReaderTextCell;
 import org.apache.sysml.runtime.io.IOUtilFunctions;
 import org.apache.sysml.runtime.io.MatrixReader;
 import org.apache.sysml.runtime.io.MatrixReaderFactory;
-import org.apache.sysml.runtime.io.ReaderTextCell;
 import org.apache.sysml.runtime.matrix.data.FrameBlock;
 import org.apache.sysml.runtime.matrix.data.InputInfo;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
@@ -324,11 +322,11 @@ public class Connection implements Closeable
 	}
 	
 	/**
-	 * Converts an input string representation of a matrix in textcell format
+	 * Converts an input string representation of a matrix in csv or textcell format
 	 * into a dense double array. The meta data string is the SystemML generated
 	 * .mtd file including the number of rows and columns.
 	 * 
-	 * @param input string matrix in textcell format
+	 * @param input string matrix in csv or textcell format
 	 * @param meta string representing SystemML matrix metadata in JSON format
 	 * @return matrix as a two-dimensional double array
 	 * @throws IOException if IOException occurs
@@ -342,15 +340,10 @@ public class Connection implements Closeable
 			int rows = jmtd.getInt(DataExpression.READROWPARAM);
 			int cols = jmtd.getInt(DataExpression.READCOLPARAM);
 			String format = jmtd.getString(DataExpression.FORMAT_TYPE);
-	
-			//sanity check input format
-			if(!(DataExpression.FORMAT_TYPE_VALUE_TEXT.equals(format)
-				||DataExpression.FORMAT_TYPE_VALUE_MATRIXMARKET.equals(format))) {
-				throw new IOException("Invalid input format (expected: text or mm): "+format);
-			}
 			
 			//parse the input matrix
-			return convertToDoubleMatrix(input, rows, cols);
+			InputStream is = IOUtilFunctions.toInputStream(input);
+			return convertToDoubleMatrix(is, rows, cols, format);
 		}
 		catch(Exception ex) {
 			throw new IOException(ex);
@@ -359,9 +352,7 @@ public class Connection implements Closeable
 	
 	/**
 	 * Converts an input string representation of a matrix in textcell format
-	 * into a dense double array. The number of rows and columns need to be 
-	 * specified because textcell only represents non-zero values and hence
-	 * does not define the dimensions in the general case.
+	 * into a dense double array. 
 	 * 
 	 * @param input string matrix in textcell format
 	 * @param rows number of rows in the matrix
@@ -378,9 +369,7 @@ public class Connection implements Closeable
 	
 	/**
 	 * Converts an input stream of a string matrix in textcell format
-	 * into a dense double array. The number of rows and columns need to be 
-	 * specified because textcell only represents non-zero values and hence
-	 * does not define the dimensions in the general case.
+	 * into a dense double array. 
 	 * 
 	 * @param input InputStream to a string matrix in textcell format
 	 * @param rows number of rows in the matrix
@@ -388,15 +377,40 @@ public class Connection implements Closeable
 	 * @return matrix as a two-dimensional double array
 	 * @throws IOException if IOException occurs
 	 */
-	public double[][] convertToDoubleMatrix(InputStream input, int rows, int cols) 
+	public double[][] convertToDoubleMatrix(InputStream input, int rows, int cols) throws IOException {
+		return convertToDoubleMatrix(input, rows, cols, DataExpression.FORMAT_TYPE_VALUE_TEXT);
+	}
+	
+	/**
+	 * Converts an input stream of a string matrix in csv or textcell format
+	 * into a dense double array. 
+	 * 
+	 * @param input InputStream to a string matrix in csv or textcell format
+	 * @param rows number of rows in the matrix
+	 * @param cols number of columns in the matrix
+	 * @param format input format of the given stream
+	 * @return matrix as a two-dimensional double array
+	 * @throws IOException if IOException occurs
+	 */
+	public double[][] convertToDoubleMatrix(InputStream input, int rows, int cols, String format) 
 		throws IOException
 	{
 		double[][] ret = null;
+
+		//sanity check input format
+		if(!(DataExpression.FORMAT_TYPE_VALUE_TEXT.equals(format)
+			||DataExpression.FORMAT_TYPE_VALUE_MATRIXMARKET.equals(format)
+			||DataExpression.FORMAT_TYPE_VALUE_CSV.equals(format)) ) {
+			throw new IOException("Invalid input format (expected: csv, text or mm): "+format);
+		}
 		
 		try {
 			//read input matrix
-			ReaderTextCell reader = (ReaderTextCell)MatrixReaderFactory.createMatrixReader(InputInfo.TextCellInputInfo);
-			MatrixBlock mb = reader.readMatrixFromInputStream(input, rows, cols, ConfigurationManager.getBlocksize(), ConfigurationManager.getBlocksize(), (long)rows*cols);
+			InputInfo iinfo = DataExpression.FORMAT_TYPE_VALUE_CSV.equals(format) ? 
+					InputInfo.CSVInputInfo : InputInfo.TextCellInputInfo;
+			MatrixReader reader = MatrixReaderFactory.createMatrixReader(iinfo);
+			MatrixBlock mb = reader.readMatrixFromInputStream(input, rows, cols, 
+					ConfigurationManager.getBlocksize(), ConfigurationManager.getBlocksize(), (long)rows*cols);
 		
 			//convert to double array
 			ret = DataConverter.convertToDoubleMatrix( mb );
@@ -467,11 +481,11 @@ public class Connection implements Closeable
 	}
 	
 	/**
-	 * Converts an input string representation of a frame in textcell format
+	 * Converts an input string representation of a frame in csv or textcell format
 	 * into a dense string array. The meta data string is the SystemML generated
 	 * .mtd file including the number of rows and columns.
 	 * 
-	 * @param input string frame in textcell format
+	 * @param input string frame in csv or textcell format
 	 * @param meta string representing SystemML frame metadata in JSON format
 	 * @return frame as a two-dimensional string array
 	 * @throws IOException if IOException occurs
@@ -485,15 +499,10 @@ public class Connection implements Closeable
 			int rows = jmtd.getInt(DataExpression.READROWPARAM);
 			int cols = jmtd.getInt(DataExpression.READCOLPARAM);
 			String format = jmtd.getString(DataExpression.FORMAT_TYPE);
-	
-			//sanity check input format
-			if(!(DataExpression.FORMAT_TYPE_VALUE_TEXT.equals(format)
-				||DataExpression.FORMAT_TYPE_VALUE_MATRIXMARKET.equals(format))) {
-				throw new IOException("Invalid input format (expected: text or mm): "+format);
-			}
 			
 			//parse the input frame
-			return convertToStringFrame(input, rows, cols);
+			InputStream is = IOUtilFunctions.toInputStream(input);
+			return convertToStringFrame(is, rows, cols, format);
 		}
 		catch(Exception ex) {
 			throw new IOException(ex);
@@ -502,9 +511,7 @@ public class Connection implements Closeable
 	
 	/**
 	 * Converts an input string representation of a frame in textcell format
-	 * into a dense string array. The number of rows and columns need to be 
-	 * specified because textcell only represents non-zero values and hence
-	 * does not define the dimensions in the general case.
+	 * into a dense string array. 
 	 * 
 	 * @param input string frame in textcell format
 	 * @param rows number of rows in the frame
@@ -521,9 +528,7 @@ public class Connection implements Closeable
 	
 	/**
 	 * Converts an input stream of a string frame in textcell format
-	 * into a dense string array. The number of rows and columns need to be 
-	 * specified because textcell only represents non-zero values and hence
-	 * does not define the dimensions in the general case.
+	 * into a dense string array. 
 	 * 
 	 * @param input InputStream to a string frame in textcell format
 	 * @param rows number of rows in the frame
@@ -531,14 +536,38 @@ public class Connection implements Closeable
 	 * @return frame as a two-dimensional string array
 	 * @throws IOException if IOException occurs
 	 */
-	public String[][] convertToStringFrame(InputStream input, int rows, int cols) 
+	public String[][] convertToStringFrame(InputStream input, int rows, int cols) throws IOException {
+		return convertToStringFrame(input, rows, cols, DataExpression.FORMAT_TYPE_VALUE_TEXT);
+	}
+	
+	/**
+	 * Converts an input stream of a string frame in csv or textcell format
+	 * into a dense string array. 
+	 * 
+	 * @param input InputStream to a string frame in csv or textcell format
+	 * @param rows number of rows in the frame
+	 * @param cols number of columns in the frame
+	 * @param format input format of the given stream
+	 * @return frame as a two-dimensional string array
+	 * @throws IOException if IOException occurs
+	 */
+	public String[][] convertToStringFrame(InputStream input, int rows, int cols, String format) 
 		throws IOException
 	{
 		String[][] ret = null;
+	
+		//sanity check input format
+		if(!(DataExpression.FORMAT_TYPE_VALUE_TEXT.equals(format)
+			||DataExpression.FORMAT_TYPE_VALUE_MATRIXMARKET.equals(format)
+			||DataExpression.FORMAT_TYPE_VALUE_CSV.equals(format))) {
+			throw new IOException("Invalid input format (expected: csv, text or mm): "+format);
+		}
 		
 		try {
-			//read input matrix
-			FrameReaderTextCell reader = (FrameReaderTextCell)FrameReaderFactory.createFrameReader(InputInfo.TextCellInputInfo);
+			//read input frame
+			InputInfo iinfo = DataExpression.FORMAT_TYPE_VALUE_CSV.equals(format) ? 
+					InputInfo.CSVInputInfo : InputInfo.TextCellInputInfo;
+			FrameReader reader = FrameReaderFactory.createFrameReader(iinfo);
 			FrameBlock mb = reader.readFrameFromInputStream(input, rows, cols);
 		
 			//convert to double array
@@ -551,7 +580,6 @@ public class Connection implements Closeable
 		return ret;
 	}
 	
-	
 	////////////////////////////////////////////
 	// Read transform meta data
 	////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/23709ec6/src/main/java/org/apache/sysml/runtime/io/FrameReader.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReader.java b/src/main/java/org/apache/sysml/runtime/io/FrameReader.java
index 3aac76b..321735d 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameReader.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameReader.java
@@ -21,6 +21,7 @@ package org.apache.sysml.runtime.io;
 
 import java.io.EOFException;
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.LinkedList;
 
 import org.apache.hadoop.fs.FileStatus;
@@ -42,7 +43,6 @@ import org.apache.sysml.runtime.util.UtilFunctions;
  */
 public abstract class FrameReader 
 {
-
 	public abstract FrameBlock readFrameFromHDFS( String fname, ValueType[] schema, String[] names, long rlen, long clen)
 		throws IOException, DMLRuntimeException;
 
@@ -58,6 +58,21 @@ public abstract class FrameReader
 		return readFrameFromHDFS(fname, getDefSchema(clen), getDefColNames(clen), rlen, clen);
 	}
 
+	public abstract FrameBlock readFrameFromInputStream( InputStream is, ValueType[] schema, String[] names, long rlen, long clen)
+		throws IOException, DMLRuntimeException;
+
+	public FrameBlock readFrameFromInputStream( InputStream is, ValueType[] schema, long rlen, long clen )
+		throws IOException, DMLRuntimeException
+	{
+		return readFrameFromInputStream(is, schema, getDefColNames(schema.length), rlen, clen);
+	}
+
+	public FrameBlock readFrameFromInputStream( InputStream is, long rlen, long clen )
+		throws IOException, DMLRuntimeException
+	{
+		return readFrameFromInputStream(is, getDefSchema(clen), getDefColNames(clen), rlen, clen);
+	}
+
 	public ValueType[] getDefSchema( long clen )
 		throws IOException, DMLRuntimeException
 	{

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/23709ec6/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java b/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java
index a9df026..32feea3 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java
@@ -20,6 +20,7 @@
 package org.apache.sysml.runtime.io;
 
 import java.io.IOException;
+import java.io.InputStream;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -38,7 +39,6 @@ import org.apache.sysml.runtime.matrix.data.FrameBlock;
  */
 public class FrameReaderBinaryBlock extends FrameReader
 {
-
 	@Override
 	public final FrameBlock readFrameFromHDFS(String fname, ValueType[] schema, String[] names, long rlen, long clen) 
 		throws IOException, DMLRuntimeException 
@@ -61,6 +61,13 @@ public class FrameReaderBinaryBlock extends FrameReader
 		
 		return ret;
 	}
+	
+	@Override
+	public FrameBlock readFrameFromInputStream(InputStream is, ValueType[] schema, String[] names, long rlen, long clen)
+		throws IOException, DMLRuntimeException 
+	{
+		throw new DMLRuntimeException("Not implemented yet.");
+	}
 
 	protected void readBinaryBlockFrameFromHDFS( Path path, JobConf job, FileSystem fs, FrameBlock dest, long rlen, long clen )
 		throws IOException, DMLRuntimeException

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/23709ec6/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java
index 707071f..e86cbe2 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java
@@ -20,6 +20,7 @@
 package org.apache.sysml.runtime.io;
 
 import java.io.IOException;
+import java.io.InputStream;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -31,6 +32,7 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapred.InputFormat;
 import org.apache.sysml.conf.ConfigurationManager;
 import org.apache.sysml.parser.Expression.ValueType;
 import org.apache.sysml.runtime.DMLRuntimeException;
@@ -38,6 +40,7 @@ import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
 import org.apache.sysml.runtime.matrix.data.FrameBlock;
 import org.apache.sysml.runtime.matrix.data.Pair;
 import org.apache.sysml.runtime.transform.TfUtils;
+import org.apache.sysml.runtime.util.InputStreamInputFormat;
 import org.apache.sysml.runtime.util.UtilFunctions;
 
 /**
@@ -83,6 +86,24 @@ public class FrameReaderTextCSV extends FrameReader
 		
 		return ret;
 	}
+	
+	@Override
+	public FrameBlock readFrameFromInputStream(InputStream is, ValueType[] schema, String[] names, 
+			long rlen, long clen)
+		throws IOException, DMLRuntimeException 
+	{
+		//allocate output frame block
+		ValueType[] lschema = createOutputSchema(schema, clen);
+		String[] lnames = createOutputNames(names, clen);
+		FrameBlock ret = createOutputFrameBlock(lschema, lnames, rlen);
+	
+		//core read (sequential/parallel) 
+		InputStreamInputFormat informat = new InputStreamInputFormat(is);
+		InputSplit split = informat.getSplits(null, 1)[0];
+		readCSVFrameFromInputSplit(split, informat, null, ret, schema, names, rlen, clen, 0, true);
+		
+		return ret;
+	}
 
 	protected void readCSVFrameFromHDFS( Path path, JobConf job, FileSystem fs, 
 			FrameBlock dest, ValueType[] schema, String[] names, long rlen, long clen) 
@@ -96,7 +117,7 @@ public class FrameReaderTextCSV extends FrameReader
 			readCSVFrameFromInputSplit(splits[i], informat, job, dest, schema, names, rlen, clen, 0, i==0);
 	}
 
-	protected final void readCSVFrameFromInputSplit( InputSplit split, TextInputFormat informat, JobConf job, 
+	protected final void readCSVFrameFromInputSplit( InputSplit split, InputFormat<LongWritable,Text> informat, JobConf job, 
 			FrameBlock dest, ValueType[] schema, String[] names, long rlen, long clen, int rl, boolean first)
 		throws IOException
 	{

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/23709ec6/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCell.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCell.java b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCell.java
index e8be829..548452f 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCell.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCell.java
@@ -47,7 +47,6 @@ import org.apache.sysml.runtime.util.UtilFunctions;
  */
 public class FrameReaderTextCell extends FrameReader
 {
-
 	@Override
 	public final FrameBlock readFrameFromHDFS(String fname, ValueType[] schema, String[] names, long rlen, long clen)
 		throws IOException, DMLRuntimeException
@@ -71,11 +70,7 @@ public class FrameReaderTextCell extends FrameReader
 		return ret;
 	}
 
-	public final FrameBlock readFrameFromInputStream(InputStream is, long rlen, long clen) 
-		throws IOException, DMLRuntimeException {
-		return readFrameFromInputStream(is, getDefSchema(clen), getDefColNames(clen), rlen, clen);
-	}
-
+	@Override
 	public final FrameBlock readFrameFromInputStream(InputStream is, ValueType[] schema, String[] names, long rlen, long clen) 
 		throws IOException, DMLRuntimeException 
 	{

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/23709ec6/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java b/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java
index 75de6a8..ffe290e 100644
--- a/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java
+++ b/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java
@@ -21,6 +21,7 @@ package org.apache.sysml.runtime.io;
 
 import java.io.EOFException;
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
@@ -54,6 +55,9 @@ public abstract class MatrixReader
 	public abstract MatrixBlock readMatrixFromHDFS( String fname, long rlen, long clen, int brlen, int bclen, long estnnz )
 		throws IOException, DMLRuntimeException;
 
+	public abstract MatrixBlock readMatrixFromInputStream( InputStream is, long rlen, long clen, int brlen, int bclen, long estnnz )
+			throws IOException, DMLRuntimeException;
+	
 	public static Path[] getSequenceFilePaths( FileSystem fs, Path file ) 
 		throws IOException
 	{

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/23709ec6/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java
index 015ae25..4c8549e 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java
@@ -20,6 +20,7 @@
 package org.apache.sysml.runtime.io;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Collection;
 
@@ -73,6 +74,13 @@ public class ReaderBinaryBlock extends MatrixReader
 		
 		return ret;
 	}
+	
+	@Override
+	public MatrixBlock readMatrixFromInputStream(InputStream is, long rlen, long clen, int brlen, int bclen, long estnnz) 
+		throws IOException, DMLRuntimeException 
+	{
+		throw new DMLRuntimeException("Not implemented yet.");
+	}
 
 	public ArrayList<IndexedMatrixValue> readIndexedMatrixBlocksFromHDFS(String fname, long rlen, long clen, int brlen, int bclen) 
 		throws IOException, DMLRuntimeException 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/23709ec6/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryCell.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryCell.java b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryCell.java
index f148ceb..dcf9e7b 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryCell.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryCell.java
@@ -20,6 +20,7 @@
 package org.apache.sysml.runtime.io;
 
 import java.io.IOException;
+import java.io.InputStream;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -34,7 +35,6 @@ import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
 
 public class ReaderBinaryCell extends MatrixReader
 {
-
 	@Override
 	public MatrixBlock readMatrixFromHDFS(String fname, long rlen, long clen, int brlen, int bclen, long estnnz) 
 		throws IOException, DMLRuntimeException 
@@ -60,6 +60,13 @@ public class ReaderBinaryCell extends MatrixReader
 		return ret;
 	}
 
+	@Override
+	public MatrixBlock readMatrixFromInputStream(InputStream is, long rlen, long clen, int brlen, int bclen, long estnnz) 
+		throws IOException, DMLRuntimeException 
+	{
+		throw new DMLRuntimeException("Not implemented yet.");
+	}
+	
 	@SuppressWarnings("deprecation")
 	private void readBinaryCellMatrixFromHDFS( Path path, JobConf job, FileSystem fs, MatrixBlock dest, long rlen, long clen, int brlen, int bclen )
 		throws IOException

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/23709ec6/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSV.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSV.java b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSV.java
index 9d8f368..6256955 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSV.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSV.java
@@ -21,12 +21,14 @@ package org.apache.sysml.runtime.io;
 
 import java.io.BufferedReader;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.mutable.MutableInt;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -41,15 +43,12 @@ import org.apache.sysml.runtime.util.UtilFunctions;
 
 public class ReaderTextCSV extends MatrixReader
 {
-
 	private CSVFileFormatProperties _props = null;
 	
-	public ReaderTextCSV(CSVFileFormatProperties props)
-	{
+	public ReaderTextCSV(CSVFileFormatProperties props) {
 		_props = props;
 	}
 	
-
 	@Override
 	public MatrixBlock readMatrixFromHDFS(String fname, long rlen, long clen, int brlen, int bclen, long estnnz) 
 		throws IOException, DMLRuntimeException 
@@ -77,12 +76,31 @@ public class ReaderTextCSV extends MatrixReader
 		
 		return ret;
 	}
-
+	
+	@Override
+	public MatrixBlock readMatrixFromInputStream(InputStream is, long rlen, long clen, int brlen, int bclen, long estnnz) 
+		throws IOException, DMLRuntimeException 
+	{
+		//allocate output matrix block
+		MatrixBlock ret = createOutputMatrixBlock(rlen, clen, (int)rlen, (int)clen, estnnz, true, false);
+		
+		//core read 
+		long lnnz = readCSVMatrixFromInputStream(is, "external inputstream", ret, new MutableInt(0), rlen, clen, 
+			brlen, bclen, _props.hasHeader(), _props.getDelim(), _props.isFill(), _props.getFillValue(), true);
+				
+		//finally check if change of sparse/dense block representation required
+		ret.setNonZeros( lnnz );
+		ret.examSparsity();
+		
+		return ret;
+	}
+	
 	@SuppressWarnings("unchecked")
 	private MatrixBlock readCSVMatrixFromHDFS( Path path, JobConf job, FileSystem fs, MatrixBlock dest, 
 			long rlen, long clen, int brlen, int bclen, boolean hasHeader, String delim, boolean fill, double fillValue )
 		throws IOException
 	{
+		//prepare file paths in alphanumeric order
 		ArrayList<Path> files=new ArrayList<Path>();
 		if(fs.isDirectory(path)) {
 			for(FileStatus stat: fs.listStatus(path, CSVReblockMR.hiddenFileFilter))
@@ -92,107 +110,118 @@ public class ReaderTextCSV extends MatrixReader
 		else
 			files.add(path);
 		
+		//determine matrix size via additional pass if required
 		if ( dest == null ) {
 			dest = computeCSVSize(files, job, fs, hasHeader, delim, fill, fillValue);
 			clen = dest.getNumColumns();
 		}
 		
-		boolean sparse = dest.isInSparseFormat();
+		//actual read of individual files
+		long lnnz = 0;
+		MutableInt row = new MutableInt(0);
+		for(int fileNo=0; fileNo<files.size(); fileNo++) {
+			lnnz += readCSVMatrixFromInputStream(fs.open(files.get(fileNo)), path.toString(), dest, 
+				row, rlen, clen, brlen, bclen, hasHeader, delim, fill, fillValue, fileNo==0);
+		}
+		
+		//post processing
+		dest.setNonZeros( lnnz );
 		
-		/////////////////////////////////////////
+		return dest;
+	}
+	
+	private long readCSVMatrixFromInputStream( InputStream is, String srcInfo, MatrixBlock dest, MutableInt rowPos, 
+			long rlen, long clen, int brlen, int bclen, boolean hasHeader, String delim, boolean fill, double fillValue, boolean first )
+		throws IOException
+	{
+		boolean sparse = dest.isInSparseFormat();
 		String value = null;
-		int row = 0;
-		int col = -1;
+		int row = rowPos.intValue();
 		double cellValue = 0;
 		long lnnz = 0;
 		
-		for(int fileNo=0; fileNo<files.size(); fileNo++)
+		BufferedReader br = new BufferedReader(new InputStreamReader(is));
+		if(first && hasHeader ) 
+			br.readLine(); //ignore header
+		
+		// Read the data
+		boolean emptyValuesFound = false;
+		try
 		{
-			BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(files.get(fileNo))));
-			if(fileNo==0 && hasHeader ) 
-				br.readLine(); //ignore header
-			
-			// Read the data
-			boolean emptyValuesFound = false;
-			try
+			if( sparse ) //SPARSE<-value
 			{
-				if( sparse ) //SPARSE<-value
+				while( (value=br.readLine())!=null ) //foreach line
 				{
-					while( (value=br.readLine())!=null ) //foreach line
+					String cellStr = value.toString().trim();
+					emptyValuesFound = false;
+					String[] parts = IOUtilFunctions.split(cellStr, delim);
+					int col = 0;
+					
+					for(String part : parts) //foreach cell
 					{
-						String cellStr = value.toString().trim();
-						emptyValuesFound = false;
-						String[] parts = IOUtilFunctions.split(cellStr, delim);
-						col = 0;
-						
-						for(String part : parts) //foreach cell
-						{
-							part = part.trim();
-							if ( part.isEmpty() ) {
-								emptyValuesFound = true;
-								cellValue = fillValue;
-							}
-							else {
-								cellValue = UtilFunctions.parseToDouble(part);
-							}
-							if ( cellValue != 0 ) {
-								dest.appendValue(row, col, cellValue);
-								lnnz++;
-							}
-							col++;
+						part = part.trim();
+						if ( part.isEmpty() ) {
+							emptyValuesFound = true;
+							cellValue = fillValue;
 						}
-						
-						//sanity checks for empty values and number of columns
-						IOUtilFunctions.checkAndRaiseErrorCSVEmptyField(cellStr, fill, emptyValuesFound);
-						IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(path.toString(), cellStr, parts, clen);
-						row++;
+						else {
+							cellValue = UtilFunctions.parseToDouble(part);
+						}
+						if ( cellValue != 0 ) {
+							dest.appendValue(row, col, cellValue);
+							lnnz++;
+						}
+						col++;
 					}
-				} 
-				else //DENSE<-value
+					
+					//sanity checks for empty values and number of columns
+					IOUtilFunctions.checkAndRaiseErrorCSVEmptyField(cellStr, fill, emptyValuesFound);
+					IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(srcInfo, cellStr, parts, clen);
+					row++;
+				}
+			} 
+			else //DENSE<-value
+			{
+				while( (value=br.readLine())!=null ) //foreach line
 				{
-					while( (value=br.readLine())!=null ) //foreach line
+					String cellStr = value.toString().trim();
+					emptyValuesFound = false;
+					String[] parts = IOUtilFunctions.split(cellStr, delim);
+					int col = 0;
+					
+					for( String part : parts ) //foreach cell
 					{
-						String cellStr = value.toString().trim();
-						emptyValuesFound = false;
-						String[] parts = IOUtilFunctions.split(cellStr, delim);
-						col = 0;
-						
-						for( String part : parts ) //foreach cell
-						{
-							part = part.trim();
-							if ( part.isEmpty() ) {
-								emptyValuesFound = true;
-								cellValue = fillValue;
-							}
-							else {
-								cellValue = UtilFunctions.parseToDouble(part);
-							}
-							if ( cellValue != 0 ) {
-								dest.setValueDenseUnsafe(row, col, cellValue);
-								lnnz++;
-							}
-							col++;
+						part = part.trim();
+						if ( part.isEmpty() ) {
+							emptyValuesFound = true;
+							cellValue = fillValue;
+						}
+						else {
+							cellValue = UtilFunctions.parseToDouble(part);
+						}
+						if ( cellValue != 0 ) {
+							dest.setValueDenseUnsafe(row, col, cellValue);
+							lnnz++;
 						}
-						
-						//sanity checks for empty values and number of columns
-						IOUtilFunctions.checkAndRaiseErrorCSVEmptyField(cellStr, fill, emptyValuesFound);
-						IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(path.toString(), cellStr, parts, clen);
-						row++;
+						col++;
 					}
+					
+					//sanity checks for empty values and number of columns
+					IOUtilFunctions.checkAndRaiseErrorCSVEmptyField(cellStr, fill, emptyValuesFound);
+					IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(srcInfo, cellStr, parts, clen);
+					row++;
 				}
 			}
-			finally {
-				IOUtilFunctions.closeSilently(br);
-			}
+		}
+		finally {
+			IOUtilFunctions.closeSilently(br);
 		}
 		
-		//post processing
-		dest.setNonZeros( lnnz );
-		
-		return dest;
+		rowPos.setValue(row);
+		return lnnz;
 	}
 
-	private MatrixBlock computeCSVSize ( List<Path> files, JobConf job, FileSystem fs, boolean hasHeader, String delim, boolean fill, double fillValue) 
+	private MatrixBlock computeCSVSize( List<Path> files, JobConf job, FileSystem fs, boolean hasHeader, String delim, boolean fill, double fillValue) 
 		throws IOException 
 	{		
 		int nrow = -1;

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/23709ec6/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java
index 15d4858..75b3bd9 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java
@@ -20,6 +20,7 @@
 package org.apache.sysml.runtime.io;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
@@ -111,6 +112,13 @@ public class ReaderTextCSVParallel extends MatrixReader
 		return ret;
 	}
 
+	@Override
+	public MatrixBlock readMatrixFromInputStream(InputStream is, long rlen, long clen, int brlen, int bclen, long estnnz) 
+		throws IOException, DMLRuntimeException 
+	{
+		throw new DMLRuntimeException("Not implemented yet.");
+	}
+	
 	private void readCSVMatrixFromHDFS(InputSplit[] splits, Path path, JobConf job, 
 			MatrixBlock dest, long rlen, long clen, int brlen, int bclen, 
 			boolean hasHeader, String delim, boolean fill, double fillValue) 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/23709ec6/src/main/java/org/apache/sysml/runtime/io/ReaderTextCell.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCell.java b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCell.java
index 1c9cba5..3b93c33 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCell.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCell.java
@@ -79,6 +79,7 @@ public class ReaderTextCell extends MatrixReader
 		return ret;
 	}
 
+	@Override
 	public MatrixBlock readMatrixFromInputStream(InputStream is, long rlen, long clen, int brlen, int bclen, long estnnz) 
 		throws IOException, DMLRuntimeException 
 	{

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/23709ec6/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java
index f693455..9501b6d 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java
@@ -20,6 +20,7 @@
 package org.apache.sysml.runtime.io;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.Callable;
@@ -102,6 +103,13 @@ public class ReaderTextCellParallel extends MatrixReader
 		return ret;
 	}
 
+	@Override
+	public MatrixBlock readMatrixFromInputStream(InputStream is, long rlen, long clen, int brlen, int bclen, long estnnz) 
+		throws IOException, DMLRuntimeException 
+	{
+		throw new DMLRuntimeException("Not implemented yet.");
+	}
+	
 	private void readTextCellMatrixFromHDFS( Path path, JobConf job, MatrixBlock dest, long rlen, long clen, int brlen, int bclen, boolean matrixMarket )
 		throws IOException
 	{

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/23709ec6/src/main/java/org/apache/sysml/runtime/util/InputStreamInputFormat.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/InputStreamInputFormat.java b/src/main/java/org/apache/sysml/runtime/util/InputStreamInputFormat.java
new file mode 100644
index 0000000..83641f6
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/util/InputStreamInputFormat.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.util;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * Custom input format and record reader to redirect common implementation of csv read 
+ * over record readers (which are required for the parallel readers) to an input stream.
+ * 
+ */
+public class InputStreamInputFormat implements InputFormat<LongWritable, Text>
+{
+	private final InputStream _input;
+		
+	public InputStreamInputFormat(InputStream is) {
+		_input = is;
+	}
+
+	@Override
+	public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+		//return dummy handle - stream accessed purely over record reader
+		return new InputSplit[]{new FileSplit(null)};
+	}
+
+	@Override
+	public RecordReader<LongWritable,Text> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
+		return new InputStreamRecordReader(_input);
+	}
+		
+	private static class InputStreamRecordReader implements RecordReader<LongWritable, Text>
+	{
+		private final BufferedReader _reader;
+		
+		public InputStreamRecordReader(InputStream is) {
+			_reader = new BufferedReader(new InputStreamReader( is ));
+		}
+		
+		@Override
+		public LongWritable createKey() {
+			return new LongWritable();
+		}
+		@Override
+		public Text createValue() {
+			return new Text();
+		}			
+		@Override
+		public float getProgress() throws IOException {
+			return 0;
+		}
+		@Override
+		public long getPos() throws IOException {
+			return 0;
+		}
+		@Override
+		public boolean next(LongWritable key, Text value) throws IOException {
+			String line = _reader.readLine();
+			if( line != null )
+				value.set(line);
+			return (line != null);
+		}
+		@Override
+		public void close() throws IOException {
+			_reader.close();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/23709ec6/src/test/java/org/apache/sysml/test/integration/functions/jmlc/JMLCInputStreamReadTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/jmlc/JMLCInputStreamReadTest.java b/src/test/java/org/apache/sysml/test/integration/functions/jmlc/JMLCInputStreamReadTest.java
new file mode 100644
index 0000000..05ee456
--- /dev/null
+++ b/src/test/java/org/apache/sysml/test/integration/functions/jmlc/JMLCInputStreamReadTest.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.test.integration.functions.jmlc;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.junit.Test;
+import org.apache.sysml.api.jmlc.Connection;
+import org.apache.sysml.parser.Expression.DataType;
+import org.apache.sysml.runtime.io.FrameWriter;
+import org.apache.sysml.runtime.io.FrameWriterFactory;
+import org.apache.sysml.runtime.io.IOUtilFunctions;
+import org.apache.sysml.runtime.io.MatrixWriter;
+import org.apache.sysml.runtime.io.MatrixWriterFactory;
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.matrix.data.OutputInfo;
+import org.apache.sysml.runtime.util.DataConverter;
+import org.apache.sysml.runtime.util.MapReduceTool;
+import org.apache.sysml.test.integration.AutomatedTestBase;
+import org.apache.sysml.test.integration.TestConfiguration;
+import org.apache.sysml.test.utils.TestUtils;
+
+public class JMLCInputStreamReadTest extends AutomatedTestBase 
+{
+	private final static String TEST_NAME = "jmlc";
+	private final static String TEST_DIR = "functions/jmlc/";
+	private final static String TEST_CLASS_DIR = TEST_DIR + JMLCInputStreamReadTest.class.getSimpleName() + "/";
+	
+	private final static int rows = 700;
+	private final static int cols = 3;
+	
+	private final static double sparsity1 = 0.7;
+	private final static double sparsity2 = 0.1;
+	
+	@Override
+	public void setUp() {
+		addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] { "R" }) ); 
+	}
+	
+	@Test
+	public void testInputStreamReadMatrixDenseCSV() throws IOException {
+		runJMLCInputStreamReadTest(DataType.MATRIX, false, "csv", false);
+	}
+	
+	@Test
+	public void testInputStreamReadMatrixDenseText() throws IOException {
+		runJMLCInputStreamReadTest(DataType.MATRIX, false, "text", false);
+	}
+	
+	@Test
+	public void testInputStreamReadMatrixSparseCSV() throws IOException {
+		runJMLCInputStreamReadTest(DataType.MATRIX, true, "csv", false);
+	}
+	
+	@Test
+	public void testInputStreamReadMatrixSparseText() throws IOException {
+		runJMLCInputStreamReadTest(DataType.MATRIX, true, "text", false);
+	}
+	
+	@Test
+	public void testInputStreamReadFrameDenseCSV() throws IOException {
+		runJMLCInputStreamReadTest(DataType.FRAME, false, "csv", false);
+	}
+	
+	@Test
+	public void testInputStreamReadFrameDenseText() throws IOException {
+		runJMLCInputStreamReadTest(DataType.FRAME, false, "text", false);
+	}
+	
+	@Test
+	public void testInputStreamReadFrameSparseCSV() throws IOException {
+		runJMLCInputStreamReadTest(DataType.FRAME, true, "csv", false);
+	}
+	
+	@Test
+	public void testInputStreamReadFrameSparseText() throws IOException {
+		runJMLCInputStreamReadTest(DataType.FRAME, true, "text", false);
+	}
+	
+	@Test
+	public void testInputStreamReadFrameDenseCSVMeta() throws IOException {
+		runJMLCInputStreamReadTest(DataType.FRAME, false, "csv", true);
+	}
+	
+	@Test
+	public void testInputStreamReadFrameDenseTextMeta() throws IOException {
+		runJMLCInputStreamReadTest(DataType.FRAME, false, "text", true);
+	}
+	
+	@Test
+	public void testInputStreamReadFrameSparseCSVMeta() throws IOException {
+		runJMLCInputStreamReadTest(DataType.FRAME, true, "csv", true);
+	}
+	
+	@Test
+	public void testInputStreamReadFrameSparseTextMeta() throws IOException {
+		runJMLCInputStreamReadTest(DataType.FRAME, true, "text", true);
+	}
+	
+	private void runJMLCInputStreamReadTest(DataType dt, boolean sparse, String format, boolean metaData ) 
+		throws IOException
+	{	
+		TestConfiguration config = getTestConfiguration(TEST_NAME);
+		loadTestConfiguration(config);
+	
+		//generate inputs
+		OutputInfo oinfo = format.equals("csv") ? OutputInfo.CSVOutputInfo : OutputInfo.TextCellOutputInfo;
+		double[][] data = TestUtils.round(getRandomMatrix(rows, cols, 0.51, 7.49, sparse?sparsity2:sparsity1, 7));
+	
+		Connection conn = new Connection();
+		
+		try
+		{
+			if( dt == DataType.MATRIX ) 
+			{
+				//write input matrix
+				MatrixBlock mb = DataConverter.convertToMatrixBlock(data);
+				MatrixWriter writer = MatrixWriterFactory.createMatrixWriter(oinfo);
+				writer.writeMatrixToHDFS(mb, output("X"), rows, cols, -1, -1, -1);
+				
+				//read matrix from input stream 
+				FileInputStream fis = new FileInputStream(output("X"));
+				double[][] data2 = conn.convertToDoubleMatrix(fis, rows, cols, format);
+				
+				//compare matrix result
+				TestUtils.compareMatrices(data, data2, rows, cols, 0);
+			}
+			else if( dt == DataType.FRAME )
+			{
+				//write input frame
+				String[][] fdata = FrameTransformTest.createFrameData(data, "V");
+				fdata[3][1] = "\"ab\"\"cdef\""; //test quoted tokens w/ inner quotes
+				if( format.equals("csv") )
+					fdata[7][2] = "\"a,bc def\""; //test delimiter and space tokens
+				FrameBlock fb = DataConverter.convertToFrameBlock(fdata);
+				if( metaData ) {
+					fb.setColumnNames(IntStream.range(0,cols).mapToObj(i -> "CC"+i)
+						.collect(Collectors.toList()).toArray(new String[0]));
+				}
+				FrameWriter writer = FrameWriterFactory.createFrameWriter(oinfo);
+				writer.writeFrameToHDFS(fb, output("X"), rows, cols);
+				
+				//read frame from input stream 
+				FileInputStream fis = new FileInputStream(output("X"));
+				String[][] fdata2 = conn.convertToStringFrame(fis, rows, cols, format);
+				
+				//compare frame result
+				TestUtils.compareFrames(fdata, fdata2, rows, cols);
+			}
+			else {
+				throw new IOException("Unsupported data type: "+dt.name());
+			}
+		}
+		catch(Exception ex) {
+			throw new RuntimeException(ex);
+		}
+		finally {
+			MapReduceTool.deleteFileIfExistOnHDFS(output("X"));
+			IOUtilFunctions.closeSilently(conn);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/23709ec6/src/test_suites/java/org/apache/sysml/test/integration/functions/jmlc/ZPackageSuite.java
----------------------------------------------------------------------
diff --git a/src/test_suites/java/org/apache/sysml/test/integration/functions/jmlc/ZPackageSuite.java b/src/test_suites/java/org/apache/sysml/test/integration/functions/jmlc/ZPackageSuite.java
index 9bf6a1c..9eb6af1 100644
--- a/src/test_suites/java/org/apache/sysml/test/integration/functions/jmlc/ZPackageSuite.java
+++ b/src/test_suites/java/org/apache/sysml/test/integration/functions/jmlc/ZPackageSuite.java
@@ -34,6 +34,7 @@ import org.junit.runners.Suite;
 	FrameReadMetaTest.class,
 	FrameTransformTest.class,
 	JMLCInputOutputTest.class,
+	JMLCInputStreamReadTest.class,
 	ReuseModelVariablesTest.class,
 	SystemTMulticlassSVMScoreTest.class
 })