You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2017/04/05 07:14:48 UTC
incubator-systemml git commit: [SYSTEMML-1464] Fix missing
matrix/frame csv read from input streams
Repository: incubator-systemml
Updated Branches:
refs/heads/master 1d1a9fa40 -> 23709ec60
[SYSTEMML-1464] Fix missing matrix/frame csv read from input streams
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/23709ec6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/23709ec6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/23709ec6
Branch: refs/heads/master
Commit: 23709ec6088af53163147eab72b2b9c06a3a637c
Parents: 1d1a9fa
Author: Matthias Boehm <mb...@gmail.com>
Authored: Wed Apr 5 00:14:10 2017 -0700
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Wed Apr 5 00:14:10 2017 -0700
----------------------------------------------------------------------
.../org/apache/sysml/api/jmlc/Connection.java | 106 +++++++----
.../apache/sysml/runtime/io/FrameReader.java | 17 +-
.../runtime/io/FrameReaderBinaryBlock.java | 9 +-
.../sysml/runtime/io/FrameReaderTextCSV.java | 23 ++-
.../sysml/runtime/io/FrameReaderTextCell.java | 7 +-
.../apache/sysml/runtime/io/MatrixReader.java | 4 +
.../sysml/runtime/io/ReaderBinaryBlock.java | 8 +
.../sysml/runtime/io/ReaderBinaryCell.java | 9 +-
.../apache/sysml/runtime/io/ReaderTextCSV.java | 189 +++++++++++--------
.../sysml/runtime/io/ReaderTextCSVParallel.java | 8 +
.../apache/sysml/runtime/io/ReaderTextCell.java | 1 +
.../runtime/io/ReaderTextCellParallel.java | 8 +
.../runtime/util/InputStreamInputFormat.java | 96 ++++++++++
.../functions/jmlc/JMLCInputStreamReadTest.java | 183 ++++++++++++++++++
.../functions/jmlc/ZPackageSuite.java | 1 +
15 files changed, 540 insertions(+), 129 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/23709ec6/src/main/java/org/apache/sysml/api/jmlc/Connection.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/jmlc/Connection.java b/src/main/java/org/apache/sysml/api/jmlc/Connection.java
index a3d7ae7..5240dc4 100644
--- a/src/main/java/org/apache/sysml/api/jmlc/Connection.java
+++ b/src/main/java/org/apache/sysml/api/jmlc/Connection.java
@@ -50,11 +50,9 @@ import org.apache.sysml.runtime.controlprogram.Program;
import org.apache.sysml.runtime.controlprogram.caching.CacheableData;
import org.apache.sysml.runtime.io.FrameReader;
import org.apache.sysml.runtime.io.FrameReaderFactory;
-import org.apache.sysml.runtime.io.FrameReaderTextCell;
import org.apache.sysml.runtime.io.IOUtilFunctions;
import org.apache.sysml.runtime.io.MatrixReader;
import org.apache.sysml.runtime.io.MatrixReaderFactory;
-import org.apache.sysml.runtime.io.ReaderTextCell;
import org.apache.sysml.runtime.matrix.data.FrameBlock;
import org.apache.sysml.runtime.matrix.data.InputInfo;
import org.apache.sysml.runtime.matrix.data.MatrixBlock;
@@ -324,11 +322,11 @@ public class Connection implements Closeable
}
/**
- * Converts an input string representation of a matrix in textcell format
+ * Converts an input string representation of a matrix in csv or textcell format
* into a dense double array. The meta data string is the SystemML generated
* .mtd file including the number of rows and columns.
*
- * @param input string matrix in textcell format
+ * @param input string matrix in csv or textcell format
* @param meta string representing SystemML matrix metadata in JSON format
* @return matrix as a two-dimensional double array
* @throws IOException if IOException occurs
@@ -342,15 +340,10 @@ public class Connection implements Closeable
int rows = jmtd.getInt(DataExpression.READROWPARAM);
int cols = jmtd.getInt(DataExpression.READCOLPARAM);
String format = jmtd.getString(DataExpression.FORMAT_TYPE);
-
- //sanity check input format
- if(!(DataExpression.FORMAT_TYPE_VALUE_TEXT.equals(format)
- ||DataExpression.FORMAT_TYPE_VALUE_MATRIXMARKET.equals(format))) {
- throw new IOException("Invalid input format (expected: text or mm): "+format);
- }
//parse the input matrix
- return convertToDoubleMatrix(input, rows, cols);
+ InputStream is = IOUtilFunctions.toInputStream(input);
+ return convertToDoubleMatrix(is, rows, cols, format);
}
catch(Exception ex) {
throw new IOException(ex);
@@ -359,9 +352,7 @@ public class Connection implements Closeable
/**
* Converts an input string representation of a matrix in textcell format
- * into a dense double array. The number of rows and columns need to be
- * specified because textcell only represents non-zero values and hence
- * does not define the dimensions in the general case.
+ * into a dense double array.
*
* @param input string matrix in textcell format
* @param rows number of rows in the matrix
@@ -378,9 +369,7 @@ public class Connection implements Closeable
/**
* Converts an input stream of a string matrix in textcell format
- * into a dense double array. The number of rows and columns need to be
- * specified because textcell only represents non-zero values and hence
- * does not define the dimensions in the general case.
+ * into a dense double array.
*
* @param input InputStream to a string matrix in textcell format
* @param rows number of rows in the matrix
@@ -388,15 +377,40 @@ public class Connection implements Closeable
* @return matrix as a two-dimensional double array
* @throws IOException if IOException occurs
*/
- public double[][] convertToDoubleMatrix(InputStream input, int rows, int cols)
+ public double[][] convertToDoubleMatrix(InputStream input, int rows, int cols) throws IOException {
+ return convertToDoubleMatrix(input, rows, cols, DataExpression.FORMAT_TYPE_VALUE_TEXT);
+ }
+
+ /**
+ * Converts an input stream of a string matrix in csv or textcell format
+ * into a dense double array.
+ *
+ * @param input InputStream to a string matrix in csv or textcell format
+ * @param rows number of rows in the matrix
+ * @param cols number of columns in the matrix
+ * @param format input format of the given stream
+ * @return matrix as a two-dimensional double array
+ * @throws IOException if IOException occurs
+ */
+ public double[][] convertToDoubleMatrix(InputStream input, int rows, int cols, String format)
throws IOException
{
double[][] ret = null;
+
+ //sanity check input format
+ if(!(DataExpression.FORMAT_TYPE_VALUE_TEXT.equals(format)
+ ||DataExpression.FORMAT_TYPE_VALUE_MATRIXMARKET.equals(format)
+ ||DataExpression.FORMAT_TYPE_VALUE_CSV.equals(format)) ) {
+ throw new IOException("Invalid input format (expected: csv, text or mm): "+format);
+ }
try {
//read input matrix
- ReaderTextCell reader = (ReaderTextCell)MatrixReaderFactory.createMatrixReader(InputInfo.TextCellInputInfo);
- MatrixBlock mb = reader.readMatrixFromInputStream(input, rows, cols, ConfigurationManager.getBlocksize(), ConfigurationManager.getBlocksize(), (long)rows*cols);
+ InputInfo iinfo = DataExpression.FORMAT_TYPE_VALUE_CSV.equals(format) ?
+ InputInfo.CSVInputInfo : InputInfo.TextCellInputInfo;
+ MatrixReader reader = MatrixReaderFactory.createMatrixReader(iinfo);
+ MatrixBlock mb = reader.readMatrixFromInputStream(input, rows, cols,
+ ConfigurationManager.getBlocksize(), ConfigurationManager.getBlocksize(), (long)rows*cols);
//convert to double array
ret = DataConverter.convertToDoubleMatrix( mb );
@@ -467,11 +481,11 @@ public class Connection implements Closeable
}
/**
- * Converts an input string representation of a frame in textcell format
+ * Converts an input string representation of a frame in csv or textcell format
* into a dense string array. The meta data string is the SystemML generated
* .mtd file including the number of rows and columns.
*
- * @param input string frame in textcell format
+ * @param input string frame in csv or textcell format
* @param meta string representing SystemML frame metadata in JSON format
* @return frame as a two-dimensional string array
* @throws IOException if IOException occurs
@@ -485,15 +499,10 @@ public class Connection implements Closeable
int rows = jmtd.getInt(DataExpression.READROWPARAM);
int cols = jmtd.getInt(DataExpression.READCOLPARAM);
String format = jmtd.getString(DataExpression.FORMAT_TYPE);
-
- //sanity check input format
- if(!(DataExpression.FORMAT_TYPE_VALUE_TEXT.equals(format)
- ||DataExpression.FORMAT_TYPE_VALUE_MATRIXMARKET.equals(format))) {
- throw new IOException("Invalid input format (expected: text or mm): "+format);
- }
//parse the input frame
- return convertToStringFrame(input, rows, cols);
+ InputStream is = IOUtilFunctions.toInputStream(input);
+ return convertToStringFrame(is, rows, cols, format);
}
catch(Exception ex) {
throw new IOException(ex);
@@ -502,9 +511,7 @@ public class Connection implements Closeable
/**
* Converts an input string representation of a frame in textcell format
- * into a dense string array. The number of rows and columns need to be
- * specified because textcell only represents non-zero values and hence
- * does not define the dimensions in the general case.
+ * into a dense string array.
*
* @param input string frame in textcell format
* @param rows number of rows in the frame
@@ -521,9 +528,7 @@ public class Connection implements Closeable
/**
* Converts an input stream of a string frame in textcell format
- * into a dense string array. The number of rows and columns need to be
- * specified because textcell only represents non-zero values and hence
- * does not define the dimensions in the general case.
+ * into a dense string array.
*
* @param input InputStream to a string frame in textcell format
* @param rows number of rows in the frame
@@ -531,14 +536,38 @@ public class Connection implements Closeable
* @return frame as a two-dimensional string array
* @throws IOException if IOException occurs
*/
- public String[][] convertToStringFrame(InputStream input, int rows, int cols)
+ public String[][] convertToStringFrame(InputStream input, int rows, int cols) throws IOException {
+ return convertToStringFrame(input, rows, cols, DataExpression.FORMAT_TYPE_VALUE_TEXT);
+ }
+
+ /**
+ * Converts an input stream of a string frame in csv or textcell format
+ * into a dense string array.
+ *
+ * @param input InputStream to a string frame in csv or textcell format
+ * @param rows number of rows in the frame
+ * @param cols number of columns in the frame
+ * @param format input format of the given stream
+ * @return frame as a two-dimensional string array
+ * @throws IOException if IOException occurs
+ */
+ public String[][] convertToStringFrame(InputStream input, int rows, int cols, String format)
throws IOException
{
String[][] ret = null;
+
+ //sanity check input format
+ if(!(DataExpression.FORMAT_TYPE_VALUE_TEXT.equals(format)
+ ||DataExpression.FORMAT_TYPE_VALUE_MATRIXMARKET.equals(format)
+ ||DataExpression.FORMAT_TYPE_VALUE_CSV.equals(format))) {
+ throw new IOException("Invalid input format (expected: csv, text or mm): "+format);
+ }
try {
- //read input matrix
- FrameReaderTextCell reader = (FrameReaderTextCell)FrameReaderFactory.createFrameReader(InputInfo.TextCellInputInfo);
+ //read input frame
+ InputInfo iinfo = DataExpression.FORMAT_TYPE_VALUE_CSV.equals(format) ?
+ InputInfo.CSVInputInfo : InputInfo.TextCellInputInfo;
+ FrameReader reader = FrameReaderFactory.createFrameReader(iinfo);
FrameBlock mb = reader.readFrameFromInputStream(input, rows, cols);
//convert to double array
@@ -551,7 +580,6 @@ public class Connection implements Closeable
return ret;
}
-
////////////////////////////////////////////
// Read transform meta data
////////////////////////////////////////////
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/23709ec6/src/main/java/org/apache/sysml/runtime/io/FrameReader.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReader.java b/src/main/java/org/apache/sysml/runtime/io/FrameReader.java
index 3aac76b..321735d 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameReader.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameReader.java
@@ -21,6 +21,7 @@ package org.apache.sysml.runtime.io;
import java.io.EOFException;
import java.io.IOException;
+import java.io.InputStream;
import java.util.LinkedList;
import org.apache.hadoop.fs.FileStatus;
@@ -42,7 +43,6 @@ import org.apache.sysml.runtime.util.UtilFunctions;
*/
public abstract class FrameReader
{
-
public abstract FrameBlock readFrameFromHDFS( String fname, ValueType[] schema, String[] names, long rlen, long clen)
throws IOException, DMLRuntimeException;
@@ -58,6 +58,21 @@ public abstract class FrameReader
return readFrameFromHDFS(fname, getDefSchema(clen), getDefColNames(clen), rlen, clen);
}
+ public abstract FrameBlock readFrameFromInputStream( InputStream is, ValueType[] schema, String[] names, long rlen, long clen)
+ throws IOException, DMLRuntimeException;
+
+ public FrameBlock readFrameFromInputStream( InputStream is, ValueType[] schema, long rlen, long clen )
+ throws IOException, DMLRuntimeException
+ {
+ return readFrameFromInputStream(is, schema, getDefColNames(schema.length), rlen, clen);
+ }
+
+ public FrameBlock readFrameFromInputStream( InputStream is, long rlen, long clen )
+ throws IOException, DMLRuntimeException
+ {
+ return readFrameFromInputStream(is, getDefSchema(clen), getDefColNames(clen), rlen, clen);
+ }
+
public ValueType[] getDefSchema( long clen )
throws IOException, DMLRuntimeException
{
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/23709ec6/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java b/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java
index a9df026..32feea3 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java
@@ -20,6 +20,7 @@
package org.apache.sysml.runtime.io;
import java.io.IOException;
+import java.io.InputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -38,7 +39,6 @@ import org.apache.sysml.runtime.matrix.data.FrameBlock;
*/
public class FrameReaderBinaryBlock extends FrameReader
{
-
@Override
public final FrameBlock readFrameFromHDFS(String fname, ValueType[] schema, String[] names, long rlen, long clen)
throws IOException, DMLRuntimeException
@@ -61,6 +61,13 @@ public class FrameReaderBinaryBlock extends FrameReader
return ret;
}
+
+ @Override
+ public FrameBlock readFrameFromInputStream(InputStream is, ValueType[] schema, String[] names, long rlen, long clen)
+ throws IOException, DMLRuntimeException
+ {
+ throw new DMLRuntimeException("Not implemented yet.");
+ }
protected void readBinaryBlockFrameFromHDFS( Path path, JobConf job, FileSystem fs, FrameBlock dest, long rlen, long clen )
throws IOException, DMLRuntimeException
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/23709ec6/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java
index 707071f..e86cbe2 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java
@@ -20,6 +20,7 @@
package org.apache.sysml.runtime.io;
import java.io.IOException;
+import java.io.InputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -31,6 +32,7 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapred.InputFormat;
import org.apache.sysml.conf.ConfigurationManager;
import org.apache.sysml.parser.Expression.ValueType;
import org.apache.sysml.runtime.DMLRuntimeException;
@@ -38,6 +40,7 @@ import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
import org.apache.sysml.runtime.matrix.data.FrameBlock;
import org.apache.sysml.runtime.matrix.data.Pair;
import org.apache.sysml.runtime.transform.TfUtils;
+import org.apache.sysml.runtime.util.InputStreamInputFormat;
import org.apache.sysml.runtime.util.UtilFunctions;
/**
@@ -83,6 +86,24 @@ public class FrameReaderTextCSV extends FrameReader
return ret;
}
+
+ @Override
+ public FrameBlock readFrameFromInputStream(InputStream is, ValueType[] schema, String[] names,
+ long rlen, long clen)
+ throws IOException, DMLRuntimeException
+ {
+ //allocate output frame block
+ ValueType[] lschema = createOutputSchema(schema, clen);
+ String[] lnames = createOutputNames(names, clen);
+ FrameBlock ret = createOutputFrameBlock(lschema, lnames, rlen);
+
+ //core read (sequential/parallel)
+ InputStreamInputFormat informat = new InputStreamInputFormat(is);
+ InputSplit split = informat.getSplits(null, 1)[0];
+ readCSVFrameFromInputSplit(split, informat, null, ret, schema, names, rlen, clen, 0, true);
+
+ return ret;
+ }
protected void readCSVFrameFromHDFS( Path path, JobConf job, FileSystem fs,
FrameBlock dest, ValueType[] schema, String[] names, long rlen, long clen)
@@ -96,7 +117,7 @@ public class FrameReaderTextCSV extends FrameReader
readCSVFrameFromInputSplit(splits[i], informat, job, dest, schema, names, rlen, clen, 0, i==0);
}
- protected final void readCSVFrameFromInputSplit( InputSplit split, TextInputFormat informat, JobConf job,
+ protected final void readCSVFrameFromInputSplit( InputSplit split, InputFormat<LongWritable,Text> informat, JobConf job,
FrameBlock dest, ValueType[] schema, String[] names, long rlen, long clen, int rl, boolean first)
throws IOException
{
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/23709ec6/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCell.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCell.java b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCell.java
index e8be829..548452f 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCell.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCell.java
@@ -47,7 +47,6 @@ import org.apache.sysml.runtime.util.UtilFunctions;
*/
public class FrameReaderTextCell extends FrameReader
{
-
@Override
public final FrameBlock readFrameFromHDFS(String fname, ValueType[] schema, String[] names, long rlen, long clen)
throws IOException, DMLRuntimeException
@@ -71,11 +70,7 @@ public class FrameReaderTextCell extends FrameReader
return ret;
}
- public final FrameBlock readFrameFromInputStream(InputStream is, long rlen, long clen)
- throws IOException, DMLRuntimeException {
- return readFrameFromInputStream(is, getDefSchema(clen), getDefColNames(clen), rlen, clen);
- }
-
+ @Override
public final FrameBlock readFrameFromInputStream(InputStream is, ValueType[] schema, String[] names, long rlen, long clen)
throws IOException, DMLRuntimeException
{
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/23709ec6/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java b/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java
index 75de6a8..ffe290e 100644
--- a/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java
+++ b/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java
@@ -21,6 +21,7 @@ package org.apache.sysml.runtime.io;
import java.io.EOFException;
import java.io.IOException;
+import java.io.InputStream;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
@@ -54,6 +55,9 @@ public abstract class MatrixReader
public abstract MatrixBlock readMatrixFromHDFS( String fname, long rlen, long clen, int brlen, int bclen, long estnnz )
throws IOException, DMLRuntimeException;
+ public abstract MatrixBlock readMatrixFromInputStream( InputStream is, long rlen, long clen, int brlen, int bclen, long estnnz )
+ throws IOException, DMLRuntimeException;
+
public static Path[] getSequenceFilePaths( FileSystem fs, Path file )
throws IOException
{
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/23709ec6/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java
index 015ae25..4c8549e 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java
@@ -20,6 +20,7 @@
package org.apache.sysml.runtime.io;
import java.io.IOException;
+import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
@@ -73,6 +74,13 @@ public class ReaderBinaryBlock extends MatrixReader
return ret;
}
+
+ @Override
+ public MatrixBlock readMatrixFromInputStream(InputStream is, long rlen, long clen, int brlen, int bclen, long estnnz)
+ throws IOException, DMLRuntimeException
+ {
+ throw new DMLRuntimeException("Not implemented yet.");
+ }
public ArrayList<IndexedMatrixValue> readIndexedMatrixBlocksFromHDFS(String fname, long rlen, long clen, int brlen, int bclen)
throws IOException, DMLRuntimeException
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/23709ec6/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryCell.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryCell.java b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryCell.java
index f148ceb..dcf9e7b 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryCell.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryCell.java
@@ -20,6 +20,7 @@
package org.apache.sysml.runtime.io;
import java.io.IOException;
+import java.io.InputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -34,7 +35,6 @@ import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
public class ReaderBinaryCell extends MatrixReader
{
-
@Override
public MatrixBlock readMatrixFromHDFS(String fname, long rlen, long clen, int brlen, int bclen, long estnnz)
throws IOException, DMLRuntimeException
@@ -60,6 +60,13 @@ public class ReaderBinaryCell extends MatrixReader
return ret;
}
+ @Override
+ public MatrixBlock readMatrixFromInputStream(InputStream is, long rlen, long clen, int brlen, int bclen, long estnnz)
+ throws IOException, DMLRuntimeException
+ {
+ throw new DMLRuntimeException("Not implemented yet.");
+ }
+
@SuppressWarnings("deprecation")
private void readBinaryCellMatrixFromHDFS( Path path, JobConf job, FileSystem fs, MatrixBlock dest, long rlen, long clen, int brlen, int bclen )
throws IOException
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/23709ec6/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSV.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSV.java b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSV.java
index 9d8f368..6256955 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSV.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSV.java
@@ -21,12 +21,14 @@ package org.apache.sysml.runtime.io;
import java.io.BufferedReader;
import java.io.IOException;
+import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.mutable.MutableInt;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -41,15 +43,12 @@ import org.apache.sysml.runtime.util.UtilFunctions;
public class ReaderTextCSV extends MatrixReader
{
-
private CSVFileFormatProperties _props = null;
- public ReaderTextCSV(CSVFileFormatProperties props)
- {
+ public ReaderTextCSV(CSVFileFormatProperties props) {
_props = props;
}
-
@Override
public MatrixBlock readMatrixFromHDFS(String fname, long rlen, long clen, int brlen, int bclen, long estnnz)
throws IOException, DMLRuntimeException
@@ -77,12 +76,31 @@ public class ReaderTextCSV extends MatrixReader
return ret;
}
-
+
+ @Override
+ public MatrixBlock readMatrixFromInputStream(InputStream is, long rlen, long clen, int brlen, int bclen, long estnnz)
+ throws IOException, DMLRuntimeException
+ {
+ //allocate output matrix block
+ MatrixBlock ret = createOutputMatrixBlock(rlen, clen, (int)rlen, (int)clen, estnnz, true, false);
+
+ //core read
+ long lnnz = readCSVMatrixFromInputStream(is, "external inputstream", ret, new MutableInt(0), rlen, clen,
+ brlen, bclen, _props.hasHeader(), _props.getDelim(), _props.isFill(), _props.getFillValue(), true);
+
+ //finally check if change of sparse/dense block representation required
+ ret.setNonZeros( lnnz );
+ ret.examSparsity();
+
+ return ret;
+ }
+
@SuppressWarnings("unchecked")
private MatrixBlock readCSVMatrixFromHDFS( Path path, JobConf job, FileSystem fs, MatrixBlock dest,
long rlen, long clen, int brlen, int bclen, boolean hasHeader, String delim, boolean fill, double fillValue )
throws IOException
{
+ //prepare file paths in alphanumeric order
ArrayList<Path> files=new ArrayList<Path>();
if(fs.isDirectory(path)) {
for(FileStatus stat: fs.listStatus(path, CSVReblockMR.hiddenFileFilter))
@@ -92,107 +110,118 @@ public class ReaderTextCSV extends MatrixReader
else
files.add(path);
+ //determine matrix size via additional pass if required
if ( dest == null ) {
dest = computeCSVSize(files, job, fs, hasHeader, delim, fill, fillValue);
clen = dest.getNumColumns();
}
- boolean sparse = dest.isInSparseFormat();
+ //actual read of individual files
+ long lnnz = 0;
+ MutableInt row = new MutableInt(0);
+ for(int fileNo=0; fileNo<files.size(); fileNo++) {
+ lnnz += readCSVMatrixFromInputStream(fs.open(files.get(fileNo)), path.toString(), dest,
+ row, rlen, clen, brlen, bclen, hasHeader, delim, fill, fillValue, fileNo==0);
+ }
+
+ //post processing
+ dest.setNonZeros( lnnz );
- /////////////////////////////////////////
+ return dest;
+ }
+
+ private long readCSVMatrixFromInputStream( InputStream is, String srcInfo, MatrixBlock dest, MutableInt rowPos,
+ long rlen, long clen, int brlen, int bclen, boolean hasHeader, String delim, boolean fill, double fillValue, boolean first )
+ throws IOException
+ {
+ boolean sparse = dest.isInSparseFormat();
String value = null;
- int row = 0;
- int col = -1;
+ int row = rowPos.intValue();
double cellValue = 0;
long lnnz = 0;
- for(int fileNo=0; fileNo<files.size(); fileNo++)
+ BufferedReader br = new BufferedReader(new InputStreamReader(is));
+ if(first && hasHeader )
+ br.readLine(); //ignore header
+
+ // Read the data
+ boolean emptyValuesFound = false;
+ try
{
- BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(files.get(fileNo))));
- if(fileNo==0 && hasHeader )
- br.readLine(); //ignore header
-
- // Read the data
- boolean emptyValuesFound = false;
- try
+ if( sparse ) //SPARSE<-value
{
- if( sparse ) //SPARSE<-value
+ while( (value=br.readLine())!=null ) //foreach line
{
- while( (value=br.readLine())!=null ) //foreach line
+ String cellStr = value.toString().trim();
+ emptyValuesFound = false;
+ String[] parts = IOUtilFunctions.split(cellStr, delim);
+ int col = 0;
+
+ for(String part : parts) //foreach cell
{
- String cellStr = value.toString().trim();
- emptyValuesFound = false;
- String[] parts = IOUtilFunctions.split(cellStr, delim);
- col = 0;
-
- for(String part : parts) //foreach cell
- {
- part = part.trim();
- if ( part.isEmpty() ) {
- emptyValuesFound = true;
- cellValue = fillValue;
- }
- else {
- cellValue = UtilFunctions.parseToDouble(part);
- }
- if ( cellValue != 0 ) {
- dest.appendValue(row, col, cellValue);
- lnnz++;
- }
- col++;
+ part = part.trim();
+ if ( part.isEmpty() ) {
+ emptyValuesFound = true;
+ cellValue = fillValue;
}
-
- //sanity checks for empty values and number of columns
- IOUtilFunctions.checkAndRaiseErrorCSVEmptyField(cellStr, fill, emptyValuesFound);
- IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(path.toString(), cellStr, parts, clen);
- row++;
+ else {
+ cellValue = UtilFunctions.parseToDouble(part);
+ }
+ if ( cellValue != 0 ) {
+ dest.appendValue(row, col, cellValue);
+ lnnz++;
+ }
+ col++;
}
- }
- else //DENSE<-value
+
+ //sanity checks for empty values and number of columns
+ IOUtilFunctions.checkAndRaiseErrorCSVEmptyField(cellStr, fill, emptyValuesFound);
+ IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(srcInfo, cellStr, parts, clen);
+ row++;
+ }
+ }
+ else //DENSE<-value
+ {
+ while( (value=br.readLine())!=null ) //foreach line
{
- while( (value=br.readLine())!=null ) //foreach line
+ String cellStr = value.toString().trim();
+ emptyValuesFound = false;
+ String[] parts = IOUtilFunctions.split(cellStr, delim);
+ int col = 0;
+
+ for( String part : parts ) //foreach cell
{
- String cellStr = value.toString().trim();
- emptyValuesFound = false;
- String[] parts = IOUtilFunctions.split(cellStr, delim);
- col = 0;
-
- for( String part : parts ) //foreach cell
- {
- part = part.trim();
- if ( part.isEmpty() ) {
- emptyValuesFound = true;
- cellValue = fillValue;
- }
- else {
- cellValue = UtilFunctions.parseToDouble(part);
- }
- if ( cellValue != 0 ) {
- dest.setValueDenseUnsafe(row, col, cellValue);
- lnnz++;
- }
- col++;
+ part = part.trim();
+ if ( part.isEmpty() ) {
+ emptyValuesFound = true;
+ cellValue = fillValue;
+ }
+ else {
+ cellValue = UtilFunctions.parseToDouble(part);
+ }
+ if ( cellValue != 0 ) {
+ dest.setValueDenseUnsafe(row, col, cellValue);
+ lnnz++;
}
-
- //sanity checks for empty values and number of columns
- IOUtilFunctions.checkAndRaiseErrorCSVEmptyField(cellStr, fill, emptyValuesFound);
- IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(path.toString(), cellStr, parts, clen);
- row++;
+ col++;
}
+
+ //sanity checks for empty values and number of columns
+ IOUtilFunctions.checkAndRaiseErrorCSVEmptyField(cellStr, fill, emptyValuesFound);
+ IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(srcInfo, cellStr, parts, clen);
+ row++;
}
}
- finally {
- IOUtilFunctions.closeSilently(br);
- }
+ }
+ finally {
+ IOUtilFunctions.closeSilently(br);
}
- //post processing
- dest.setNonZeros( lnnz );
-
- return dest;
+ rowPos.setValue(row);
+ return lnnz;
}
- private MatrixBlock computeCSVSize ( List<Path> files, JobConf job, FileSystem fs, boolean hasHeader, String delim, boolean fill, double fillValue)
+ private MatrixBlock computeCSVSize( List<Path> files, JobConf job, FileSystem fs, boolean hasHeader, String delim, boolean fill, double fillValue)
throws IOException
{
int nrow = -1;
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/23709ec6/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java
index 15d4858..75b3bd9 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java
@@ -20,6 +20,7 @@
package org.apache.sysml.runtime.io;
import java.io.IOException;
+import java.io.InputStream;
import java.util.ArrayList;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
@@ -111,6 +112,13 @@ public class ReaderTextCSVParallel extends MatrixReader
return ret;
}
+ @Override
+ public MatrixBlock readMatrixFromInputStream(InputStream is, long rlen, long clen, int brlen, int bclen, long estnnz)
+ throws IOException, DMLRuntimeException
+ {
+ throw new DMLRuntimeException("Not implemented yet.");
+ }
+
private void readCSVMatrixFromHDFS(InputSplit[] splits, Path path, JobConf job,
MatrixBlock dest, long rlen, long clen, int brlen, int bclen,
boolean hasHeader, String delim, boolean fill, double fillValue)
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/23709ec6/src/main/java/org/apache/sysml/runtime/io/ReaderTextCell.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCell.java b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCell.java
index 1c9cba5..3b93c33 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCell.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCell.java
@@ -79,6 +79,7 @@ public class ReaderTextCell extends MatrixReader
return ret;
}
+ @Override
public MatrixBlock readMatrixFromInputStream(InputStream is, long rlen, long clen, int brlen, int bclen, long estnnz)
throws IOException, DMLRuntimeException
{
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/23709ec6/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java
index f693455..9501b6d 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java
@@ -20,6 +20,7 @@
package org.apache.sysml.runtime.io;
import java.io.IOException;
+import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
@@ -102,6 +103,13 @@ public class ReaderTextCellParallel extends MatrixReader
return ret;
}
+ @Override
+ public MatrixBlock readMatrixFromInputStream(InputStream is, long rlen, long clen, int brlen, int bclen, long estnnz)
+ throws IOException, DMLRuntimeException
+ {
+ throw new DMLRuntimeException("Not implemented yet.");
+ }
+
private void readTextCellMatrixFromHDFS( Path path, JobConf job, MatrixBlock dest, long rlen, long clen, int brlen, int bclen, boolean matrixMarket )
throws IOException
{
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/23709ec6/src/main/java/org/apache/sysml/runtime/util/InputStreamInputFormat.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/InputStreamInputFormat.java b/src/main/java/org/apache/sysml/runtime/util/InputStreamInputFormat.java
new file mode 100644
index 0000000..83641f6
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/util/InputStreamInputFormat.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.util;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * Custom input format and record reader to redirect common implementation of csv read
+ * over record readers (which are required for the parallel readers) to an input stream.
+ *
+ */
+public class InputStreamInputFormat implements InputFormat<LongWritable, Text>
+{
+ private final InputStream _input;
+
+ public InputStreamInputFormat(InputStream is) {
+ _input = is;
+ }
+
+ @Override
+ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+ //return dummy handle - stream accessed purely over record reader
+ return new InputSplit[]{new FileSplit(null)};
+ }
+
+ @Override
+ public RecordReader<LongWritable,Text> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
+ return new InputStreamRecordReader(_input);
+ }
+
+ private static class InputStreamRecordReader implements RecordReader<LongWritable, Text>
+ {
+ private final BufferedReader _reader;
+
+ public InputStreamRecordReader(InputStream is) {
+ _reader = new BufferedReader(new InputStreamReader( is ));
+ }
+
+ @Override
+ public LongWritable createKey() {
+ return new LongWritable();
+ }
+ @Override
+ public Text createValue() {
+ return new Text();
+ }
+ @Override
+ public float getProgress() throws IOException {
+ return 0;
+ }
+ @Override
+ public long getPos() throws IOException {
+ return 0;
+ }
+ @Override
+ public boolean next(LongWritable key, Text value) throws IOException {
+ String line = _reader.readLine();
+ if( line != null )
+ value.set(line);
+ return (line != null);
+ }
+ @Override
+ public void close() throws IOException {
+ _reader.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/23709ec6/src/test/java/org/apache/sysml/test/integration/functions/jmlc/JMLCInputStreamReadTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/jmlc/JMLCInputStreamReadTest.java b/src/test/java/org/apache/sysml/test/integration/functions/jmlc/JMLCInputStreamReadTest.java
new file mode 100644
index 0000000..05ee456
--- /dev/null
+++ b/src/test/java/org/apache/sysml/test/integration/functions/jmlc/JMLCInputStreamReadTest.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.test.integration.functions.jmlc;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.junit.Test;
+import org.apache.sysml.api.jmlc.Connection;
+import org.apache.sysml.parser.Expression.DataType;
+import org.apache.sysml.runtime.io.FrameWriter;
+import org.apache.sysml.runtime.io.FrameWriterFactory;
+import org.apache.sysml.runtime.io.IOUtilFunctions;
+import org.apache.sysml.runtime.io.MatrixWriter;
+import org.apache.sysml.runtime.io.MatrixWriterFactory;
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.matrix.data.OutputInfo;
+import org.apache.sysml.runtime.util.DataConverter;
+import org.apache.sysml.runtime.util.MapReduceTool;
+import org.apache.sysml.test.integration.AutomatedTestBase;
+import org.apache.sysml.test.integration.TestConfiguration;
+import org.apache.sysml.test.utils.TestUtils;
+
+public class JMLCInputStreamReadTest extends AutomatedTestBase
+{
+ private final static String TEST_NAME = "jmlc";
+ private final static String TEST_DIR = "functions/jmlc/";
+ private final static String TEST_CLASS_DIR = TEST_DIR + JMLCInputStreamReadTest.class.getSimpleName() + "/";
+
+ private final static int rows = 700;
+ private final static int cols = 3;
+
+ private final static double sparsity1 = 0.7;
+ private final static double sparsity2 = 0.1;
+
+ @Override
+ public void setUp() {
+ addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] { "R" }) );
+ }
+
+ @Test
+ public void testInputStreamReadMatrixDenseCSV() throws IOException {
+ runJMLCInputStreamReadTest(DataType.MATRIX, false, "csv", false);
+ }
+
+ @Test
+ public void testInputStreamReadMatrixDenseText() throws IOException {
+ runJMLCInputStreamReadTest(DataType.MATRIX, false, "text", false);
+ }
+
+ @Test
+ public void testInputStreamReadMatrixSparseCSV() throws IOException {
+ runJMLCInputStreamReadTest(DataType.MATRIX, true, "csv", false);
+ }
+
+ @Test
+ public void testInputStreamReadMatrixSparseText() throws IOException {
+ runJMLCInputStreamReadTest(DataType.MATRIX, true, "text", false);
+ }
+
+ @Test
+ public void testInputStreamReadFrameDenseCSV() throws IOException {
+ runJMLCInputStreamReadTest(DataType.FRAME, false, "csv", false);
+ }
+
+ @Test
+ public void testInputStreamReadFrameDenseText() throws IOException {
+ runJMLCInputStreamReadTest(DataType.FRAME, false, "text", false);
+ }
+
+ @Test
+ public void testInputStreamReadFrameSparseCSV() throws IOException {
+ runJMLCInputStreamReadTest(DataType.FRAME, true, "csv", false);
+ }
+
+ @Test
+ public void testInputStreamReadFrameSparseText() throws IOException {
+ runJMLCInputStreamReadTest(DataType.FRAME, true, "text", false);
+ }
+
+ @Test
+ public void testInputStreamReadFrameDenseCSVMeta() throws IOException {
+ runJMLCInputStreamReadTest(DataType.FRAME, false, "csv", true);
+ }
+
+ @Test
+ public void testInputStreamReadFrameDenseTextMeta() throws IOException {
+ runJMLCInputStreamReadTest(DataType.FRAME, false, "text", true);
+ }
+
+ @Test
+ public void testInputStreamReadFrameSparseCSVMeta() throws IOException {
+ runJMLCInputStreamReadTest(DataType.FRAME, true, "csv", true);
+ }
+
+ @Test
+ public void testInputStreamReadFrameSparseTextMeta() throws IOException {
+ runJMLCInputStreamReadTest(DataType.FRAME, true, "text", true);
+ }
+
+ private void runJMLCInputStreamReadTest(DataType dt, boolean sparse, String format, boolean metaData )
+ throws IOException
+ {
+ TestConfiguration config = getTestConfiguration(TEST_NAME);
+ loadTestConfiguration(config);
+
+ //generate inputs
+ OutputInfo oinfo = format.equals("csv") ? OutputInfo.CSVOutputInfo : OutputInfo.TextCellOutputInfo;
+ double[][] data = TestUtils.round(getRandomMatrix(rows, cols, 0.51, 7.49, sparse?sparsity2:sparsity1, 7));
+
+ Connection conn = new Connection();
+
+ try
+ {
+ if( dt == DataType.MATRIX )
+ {
+ //write input matrix
+ MatrixBlock mb = DataConverter.convertToMatrixBlock(data);
+ MatrixWriter writer = MatrixWriterFactory.createMatrixWriter(oinfo);
+ writer.writeMatrixToHDFS(mb, output("X"), rows, cols, -1, -1, -1);
+
+ //read matrix from input stream
+ FileInputStream fis = new FileInputStream(output("X"));
+ double[][] data2 = conn.convertToDoubleMatrix(fis, rows, cols, format);
+
+ //compare matrix result
+ TestUtils.compareMatrices(data, data2, rows, cols, 0);
+ }
+ else if( dt == DataType.FRAME )
+ {
+ //write input frame
+ String[][] fdata = FrameTransformTest.createFrameData(data, "V");
+ fdata[3][1] = "\"ab\"\"cdef\""; //test quoted tokens w/ inner quotes
+ if( format.equals("csv") )
+ fdata[7][2] = "\"a,bc def\""; //test delimiter and space tokens
+ FrameBlock fb = DataConverter.convertToFrameBlock(fdata);
+ if( metaData ) {
+ fb.setColumnNames(IntStream.range(0,cols).mapToObj(i -> "CC"+i)
+ .collect(Collectors.toList()).toArray(new String[0]));
+ }
+ FrameWriter writer = FrameWriterFactory.createFrameWriter(oinfo);
+ writer.writeFrameToHDFS(fb, output("X"), rows, cols);
+
+ //read frame from input stream
+ FileInputStream fis = new FileInputStream(output("X"));
+ String[][] fdata2 = conn.convertToStringFrame(fis, rows, cols, format);
+
+ //compare frame result
+ TestUtils.compareFrames(fdata, fdata2, rows, cols);
+ }
+ else {
+ throw new IOException("Unsupported data type: "+dt.name());
+ }
+ }
+ catch(Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ finally {
+ MapReduceTool.deleteFileIfExistOnHDFS(output("X"));
+ IOUtilFunctions.closeSilently(conn);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/23709ec6/src/test_suites/java/org/apache/sysml/test/integration/functions/jmlc/ZPackageSuite.java
----------------------------------------------------------------------
diff --git a/src/test_suites/java/org/apache/sysml/test/integration/functions/jmlc/ZPackageSuite.java b/src/test_suites/java/org/apache/sysml/test/integration/functions/jmlc/ZPackageSuite.java
index 9bf6a1c..9eb6af1 100644
--- a/src/test_suites/java/org/apache/sysml/test/integration/functions/jmlc/ZPackageSuite.java
+++ b/src/test_suites/java/org/apache/sysml/test/integration/functions/jmlc/ZPackageSuite.java
@@ -34,6 +34,7 @@ import org.junit.runners.Suite;
FrameReadMetaTest.class,
FrameTransformTest.class,
JMLCInputOutputTest.class,
+ JMLCInputStreamReadTest.class,
ReuseModelVariablesTest.class,
SystemTMulticlassSVMScoreTest.class
})