You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2018/02/13 05:34:33 UTC
systemml git commit: [SYSTEMML-2135] I/O operation support for
matrices with zero rows/cols
Repository: systemml
Updated Branches:
refs/heads/master 85cb9e34e -> d753af90d
[SYSTEMML-2135] I/O operation support for matrices with zero rows/cols
This patch completes the support for matrices with zero rows or columns
by extending the readers and writers for all formats and all backends
accordingly. Furthermore, this also includes a number of minor
modifications to prevent incorrect validation errors.
Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/d753af90
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/d753af90
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/d753af90
Branch: refs/heads/master
Commit: d753af90da8bfd07e4feac34db476e139eeb217d
Parents: 85cb9e3
Author: Matthias Boehm <mb...@gmail.com>
Authored: Mon Feb 12 20:40:20 2018 -0800
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Mon Feb 12 20:40:20 2018 -0800
----------------------------------------------------------------------
.../org/apache/sysml/parser/DataExpression.java | 37 ++--
.../parfor/ResultMergeLocalFile.java | 4 +-
.../instructions/spark/WriteSPInstruction.java | 5 +
.../spark/utils/RDDConverterUtils.java | 1 +
.../instructions/spark/utils/SparkUtils.java | 4 +-
.../sysml/runtime/io/FrameWriterTextCell.java | 7 +-
.../sysml/runtime/io/IOUtilFunctions.java | 3 +
.../apache/sysml/runtime/io/ReaderTextCell.java | 6 +-
.../runtime/io/ReaderTextCellParallel.java | 14 +-
.../sysml/runtime/io/WriterMatrixMarket.java | 7 +-
.../apache/sysml/runtime/io/WriterTextCSV.java | 5 +-
.../sysml/runtime/io/WriterTextCSVParallel.java | 22 +-
.../apache/sysml/runtime/io/WriterTextCell.java | 8 +-
.../runtime/matrix/MatrixCharacteristics.java | 3 +-
.../apache/sysml/runtime/matrix/WriteCSVMR.java | 7 +
.../matrix/data/TextToBinaryCellConverter.java | 14 +-
.../runtime/matrix/mapred/ReblockMapper.java | 15 +-
.../functions/data/WriteReadZeroDimsTest.java | 217 +++++++++++++++++++
.../scripts/functions/data/ZeroDimDataRead.dml | 29 +++
.../scripts/functions/data/ZeroDimDataWrite.dml | 23 ++
.../functions/data/ZPackageSuite.java | 1 +
21 files changed, 359 insertions(+), 73 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/systemml/blob/d753af90/src/main/java/org/apache/sysml/parser/DataExpression.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/parser/DataExpression.java b/src/main/java/org/apache/sysml/parser/DataExpression.java
index d17443f..9764dfc 100644
--- a/src/main/java/org/apache/sysml/parser/DataExpression.java
+++ b/src/main/java/org/apache/sysml/parser/DataExpression.java
@@ -682,7 +682,6 @@ public class DataExpression extends DataIdentifier
// process 2nd line of MatrixMarket format -- must have size information
-
String secondLine = headerLines[1];
String[] sizeInfo = secondLine.trim().split("\\s+");
if (sizeInfo.length != 3){
@@ -690,39 +689,37 @@ public class DataExpression extends DataIdentifier
headerLines[1] + ". Only supported format in MatrixMarket file has size line: <NUM ROWS> <NUM COLS> <NUM NON-ZEROS>, where each value is an integer.", conditional);
}
- long rowsCount = -1, colsCount = -1, nnzCount = -1;
try {
- rowsCount = Long.parseLong(sizeInfo[0]);
- if (rowsCount < 1)
+ long rowsCount = Long.parseLong(sizeInfo[0]);
+ if (rowsCount < 0)
throw new Exception("invalid rows count");
addVarParam(READROWPARAM, new IntIdentifier(rowsCount, this));
} catch (Exception e) {
- raiseValidateError(
- "In MatrixMarket file " + getVarParam(IO_FILENAME) + " invalid row count " + sizeInfo[0]
- + " (must be long value >= 1). Sizing info line from file: " + headerLines[1],
- conditional, LanguageErrorCodes.INVALID_PARAMETERS);
+ raiseValidateError("In MatrixMarket file " + getVarParam(IO_FILENAME) + " invalid row count "
+ + sizeInfo[0] + " (must be long value >= 0). Sizing info line from file: " + headerLines[1],
+ conditional, LanguageErrorCodes.INVALID_PARAMETERS);
}
try {
- colsCount = Long.parseLong(sizeInfo[1]);
- if (colsCount < 1)
+ long colsCount = Long.parseLong(sizeInfo[1]);
+ if (colsCount < 0)
throw new Exception("invalid cols count");
addVarParam(READCOLPARAM, new IntIdentifier(colsCount, this));
} catch (Exception e) {
raiseValidateError("In MatrixMarket file " + getVarParam(IO_FILENAME) + " invalid column count "
- + sizeInfo[1] + " (must be long value >= 1). Sizing info line from file: "
+ + sizeInfo[1] + " (must be long value >= 0). Sizing info line from file: "
+ headerLines[1], conditional, LanguageErrorCodes.INVALID_PARAMETERS);
}
try {
- nnzCount = Long.parseLong(sizeInfo[2]);
- if (nnzCount < 1)
+ long nnzCount = Long.parseLong(sizeInfo[2]);
+ if (nnzCount < 0)
throw new Exception("invalid nnz count");
addVarParam("nnz", new IntIdentifier(nnzCount, this));
} catch (Exception e) {
raiseValidateError("In MatrixMarket file " + getVarParam(IO_FILENAME)
+ " invalid number non-zeros " + sizeInfo[2]
- + " (must be long value >= 1). Sizing info line from file: " + headerLines[1],
+ + " (must be long value >= 0). Sizing info line from file: " + headerLines[1],
conditional, LanguageErrorCodes.INVALID_PARAMETERS);
}
}
@@ -857,17 +854,17 @@ public class DataExpression extends DataIdentifier
if ( !isCSV && ConfigurationManager.getCompilerConfig()
.getBool(ConfigType.REJECT_READ_WRITE_UNKNOWNS) //skip check for csv format / jmlc api
&& (getVarParam(READROWPARAM) == null || getVarParam(READCOLPARAM) == null) ) {
- raiseValidateError("Missing or incomplete dimension information in read statement: "
- + mtdFileName, conditional, LanguageErrorCodes.INVALID_PARAMETERS);
+ raiseValidateError("Missing or incomplete dimension information in read statement: "
+ + mtdFileName, conditional, LanguageErrorCodes.INVALID_PARAMETERS);
}
if (getVarParam(READROWPARAM) instanceof ConstIdentifier
- && getVarParam(READCOLPARAM) instanceof ConstIdentifier)
+ && getVarParam(READCOLPARAM) instanceof ConstIdentifier)
{
// these are strings that are long values
Long dim1 = (getVarParam(READROWPARAM) == null) ? null : Long.valueOf( getVarParam(READROWPARAM).toString());
- Long dim2 = (getVarParam(READCOLPARAM) == null) ? null : Long.valueOf( getVarParam(READCOLPARAM).toString());
- if ( !isCSV && (dim1 <= 0 || dim2 <= 0) && ConfigurationManager
+ Long dim2 = (getVarParam(READCOLPARAM) == null) ? null : Long.valueOf( getVarParam(READCOLPARAM).toString());
+ if ( !isCSV && (dim1 < 0 || dim2 < 0) && ConfigurationManager
.getCompilerConfig().getBool(ConfigType.REJECT_READ_WRITE_UNKNOWNS) ) {
raiseValidateError("Invalid dimension information in read statement", conditional, LanguageErrorCodes.INVALID_PARAMETERS);
}
@@ -877,7 +874,7 @@ public class DataExpression extends DataIdentifier
getOutput().setDimensions(dim1, dim2);
} else if (!isCSV && ((dim1 != null) || (dim2 != null))) {
raiseValidateError("Partial dimension information in read statement", conditional, LanguageErrorCodes.INVALID_PARAMETERS);
- }
+ }
}
// initialize block dimensions to UNKNOWN
http://git-wip-us.apache.org/repos/asf/systemml/blob/d753af90/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java
index dcb85c8..0768328 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java
@@ -849,10 +849,10 @@ public class ResultMergeLocalFile extends ResultMerge
}
}
}
- }
+ }
if( !written )
- out.write("1 1 0\n");
+ out.write(IOUtilFunctions.EMPTY_TEXT_LINE);
}
}
http://git-wip-us.apache.org/repos/asf/systemml/blob/d753af90/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java
index 5894e76..76f8c8f 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java
@@ -189,6 +189,11 @@ public class WriteSPInstruction extends SPInstruction {
}
else if( oi == OutputInfo.CSVOutputInfo )
{
+ if( mc.getRows() == 0 || mc.getCols() == 0 ) {
+ throw new IOException("Write of matrices with zero rows or columns"
+ + " not supported ("+mc.getRows()+"x"+mc.getCols()+").");
+ }
+
LongAccumulator aNnz = null;
//piggyback nnz computation on actual write
http://git-wip-us.apache.org/repos/asf/systemml/blob/d753af90/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
index e3ab541..2dedc91 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
@@ -535,6 +535,7 @@ public class RDDConverterUtils
st.reset( strVal );
long row = st.nextLong();
long col = st.nextLong();
+ if( row == 0 || col == 0 ) continue;
double val = st.nextDouble();
//flush buffer if necessary
http://git-wip-us.apache.org/repos/asf/systemml/blob/d753af90/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java
index 977cd10..801fe5a 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java
@@ -197,8 +197,8 @@ public class SparkUtils
public static JavaPairRDD<MatrixIndexes, MatrixBlock> getEmptyBlockRDD( JavaSparkContext sc, MatrixCharacteristics mc )
{
//compute degree of parallelism and block ranges
- long size = mc.getNumBlocks() * OptimizerUtils.estimateSizeEmptyBlock(Math.max(
- mc.getRows(), mc.getRowsPerBlock()), Math.max(mc.getCols(), mc.getColsPerBlock()));
+ long size = mc.getNumBlocks() * OptimizerUtils.estimateSizeEmptyBlock(Math.min(
+ Math.max(mc.getRows(),1), mc.getRowsPerBlock()), Math.min(Math.max(mc.getCols(),1), mc.getColsPerBlock()));
int par = (int) Math.min(Math.max(SparkExecutionContext.getDefaultParallelism(true),
Math.ceil(size/InfrastructureAnalyzer.getHDFSBlockSize())), mc.getNumBlocks());
long pNumBlocks = (long)Math.ceil((double)mc.getNumBlocks()/par);
http://git-wip-us.apache.org/repos/asf/systemml/blob/d753af90/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCell.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCell.java b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCell.java
index 2bbd206..be11fea 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCell.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCell.java
@@ -128,12 +128,11 @@ public class FrameWriterTextCell extends FrameWriter
}
//handle empty result
- if ( !entriesWritten ) {
- br.write("1 1 0\n");
- }
+ if ( !entriesWritten )
+ br.write(IOUtilFunctions.EMPTY_TEXT_LINE);
}
finally {
IOUtilFunctions.closeSilently(br);
- }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/systemml/blob/d753af90/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java b/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java
index 526ad98..4d388c9 100644
--- a/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java
+++ b/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java
@@ -55,6 +55,9 @@ public class IOUtilFunctions
{
private static final Log LOG = LogFactory.getLog(UtilFunctions.class.getName());
+ //for empty text lines we use 0-0 despite for 1-based indexing in order
+ //to allow matrices with zero rows and columns (consistent with R)
+ public static final String EMPTY_TEXT_LINE = "0 0 0\n";
private static final char CSV_QUOTE_CHAR = '"';
public static FileSystem getFileSystem(String fname) throws IOException {
http://git-wip-us.apache.org/repos/asf/systemml/blob/d753af90/src/main/java/org/apache/sysml/runtime/io/ReaderTextCell.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCell.java b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCell.java
index dd5ae51..918ab6b 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCell.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCell.java
@@ -128,6 +128,7 @@ public class ReaderTextCell extends MatrixReader
st.reset( value.toString() ); //reinit tokenizer
row = st.nextInt() - 1;
col = st.nextInt() - 1;
+ if(row == -1 || col == -1) continue;
double lvalue = st.nextDouble();
dest.appendValue(row, col, lvalue);
}
@@ -141,6 +142,7 @@ public class ReaderTextCell extends MatrixReader
st.reset( value.toString() ); //reinit tokenizer
row = st.nextInt()-1;
col = st.nextInt()-1;
+ if(row == -1 || col == -1) continue;
double lvalue = st.nextDouble();
a.set( row, col, lvalue );
}
@@ -174,7 +176,7 @@ public class ReaderTextCell extends MatrixReader
private static void readRawTextCellMatrixFromInputStream( InputStream is, MatrixBlock dest, long rlen, long clen, int brlen, int bclen, boolean matrixMarket )
throws IOException
{
- BufferedReader br = new BufferedReader(new InputStreamReader( is ));
+ BufferedReader br = new BufferedReader(new InputStreamReader( is ));
boolean sparse = dest.isInSparseFormat();
String value = null;
@@ -214,6 +216,7 @@ public class ReaderTextCell extends MatrixReader
st.reset( value ); //reinit tokenizer
row = st.nextInt()-1;
col = st.nextInt()-1;
+ if(row == -1 || col == -1) continue;
double lvalue = st.nextDouble();
dest.appendValue(row, col, lvalue);
}
@@ -227,6 +230,7 @@ public class ReaderTextCell extends MatrixReader
st.reset( value ); //reinit tokenizer
row = st.nextInt()-1;
col = st.nextInt()-1;
+ if(row == -1 || col == -1) continue;
double lvalue = st.nextDouble();
a.set( row, col, lvalue );
}
http://git-wip-us.apache.org/repos/asf/systemml/blob/d753af90/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java
index b692cb1..6087210 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java
@@ -83,7 +83,7 @@ public class ReaderTextCellParallel extends MatrixReader
throws IOException, DMLRuntimeException
{
//prepare file access
- JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
+ JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
Path path = new Path( fname );
FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
@@ -214,10 +214,12 @@ public class ReaderTextCellParallel extends MatrixReader
st.reset( value.toString() ); //reinit tokenizer
row = st.nextInt()-1;
col = st.nextInt()-1;
- double lvalue = st.nextDoubleForParallel();
- synchronized( _dest ){ //sparse requires lock
- _dest.appendValue(row, col, lvalue);
- lnnz++;
+ if(row != -1 || col != -1) {
+ double lvalue = st.nextDoubleForParallel();
+ synchronized( _dest ){ //sparse requires lock
+ _dest.appendValue(row, col, lvalue);
+ lnnz++;
+ }
}
}
}
@@ -230,6 +232,7 @@ public class ReaderTextCellParallel extends MatrixReader
st.reset( value.toString() ); //reinit tokenizer
row = st.nextInt() - 1;
col = st.nextInt() - 1;
+ if(row == -1 || col == -1) continue;
double lvalue = st.nextDoubleForParallel();
buff.addCell(row, col, lvalue);
@@ -254,6 +257,7 @@ public class ReaderTextCellParallel extends MatrixReader
st.reset( value.toString() ); //reinit tokenizer
row = st.nextInt()-1;
col = st.nextInt()-1;
+ if(row == -1 || col == -1) continue;
double lvalue = st.nextDoubleForParallel();
a.set( row, col, lvalue );
lnnz += (lvalue!=0) ? 1 : 0;
http://git-wip-us.apache.org/repos/asf/systemml/blob/d753af90/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarket.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarket.java b/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarket.java
index f862f3d..c9f42bf 100644
--- a/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarket.java
+++ b/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarket.java
@@ -74,7 +74,7 @@ public class WriterMatrixMarket extends MatrixWriter
FSDataOutputStream writer = null;
try {
writer = fs.create(path);
- writer.writeBytes("1 1 0");
+ writer.writeBytes(IOUtilFunctions.EMPTY_TEXT_LINE);
}
finally {
IOUtilFunctions.closeSilently(writer);
@@ -156,9 +156,8 @@ public class WriterMatrixMarket extends MatrixWriter
}
//handle empty result
- if ( src.isEmptyBlock(false) && rl==0 ) {
- br.write("1 1 0\n");
- }
+ if ( src.isEmptyBlock(false) && rl==0 )
+ br.write(IOUtilFunctions.EMPTY_TEXT_LINE);
}
finally {
IOUtilFunctions.closeSilently(br);
http://git-wip-us.apache.org/repos/asf/systemml/blob/d753af90/src/main/java/org/apache/sysml/runtime/io/WriterTextCSV.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/WriterTextCSV.java b/src/main/java/org/apache/sysml/runtime/io/WriterTextCSV.java
index 7b39eb3..a3015f2 100644
--- a/src/main/java/org/apache/sysml/runtime/io/WriterTextCSV.java
+++ b/src/main/java/org/apache/sysml/runtime/io/WriterTextCSV.java
@@ -58,9 +58,10 @@ public class WriterTextCSV extends MatrixWriter
throws IOException, DMLRuntimeException
{
//validity check matrix dimensions
- if( src.getNumRows() != rlen || src.getNumColumns() != clen ) {
+ if( src.getNumRows() != rlen || src.getNumColumns() != clen )
throw new IOException("Matrix dimensions mismatch with metadata: "+src.getNumRows()+"x"+src.getNumColumns()+" vs "+rlen+"x"+clen+".");
- }
+ if( rlen == 0 || clen == 0 )
+ throw new IOException("Write of matrices with zero rows or columns not supported ("+rlen+"x"+clen+").");
//prepare file access
JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
http://git-wip-us.apache.org/repos/asf/systemml/blob/d753af90/src/main/java/org/apache/sysml/runtime/io/WriterTextCSVParallel.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/WriterTextCSVParallel.java b/src/main/java/org/apache/sysml/runtime/io/WriterTextCSVParallel.java
index 2bef173..2c56220 100644
--- a/src/main/java/org/apache/sysml/runtime/io/WriterTextCSVParallel.java
+++ b/src/main/java/org/apache/sysml/runtime/io/WriterTextCSVParallel.java
@@ -46,11 +46,11 @@ public class WriterTextCSVParallel extends WriterTextCSV
}
@Override
- protected void writeCSVMatrixToHDFS(Path path, JobConf job, FileSystem fs, MatrixBlock src, CSVFileFormatProperties csvprops)
+ protected void writeCSVMatrixToHDFS(Path path, JobConf job, FileSystem fs, MatrixBlock src, CSVFileFormatProperties csvprops)
throws IOException
{
//estimate output size and number of output blocks (min 1)
- int numPartFiles = (int)(OptimizerUtils.estimateSizeTextOutput(src.getNumRows(), src.getNumColumns(),
+ int numPartFiles = (int)(OptimizerUtils.estimateSizeTextOutput(src.getNumRows(), src.getNumColumns(),
src.getNonZeros(), OutputInfo.CSVOutputInfo) / InfrastructureAnalyzer.getHDFSBlockSize());
numPartFiles = Math.max(numPartFiles, 1);
@@ -80,7 +80,7 @@ public class WriterTextCSVParallel extends WriterTextCSV
}
//wait until all tasks have been executed
- List<Future<Object>> rt = pool.invokeAll(tasks);
+ List<Future<Object>> rt = pool.invokeAll(tasks);
pool.shutdown();
//check for exceptions
@@ -101,13 +101,12 @@ public class WriterTextCSVParallel extends WriterTextCSV
private class WriteCSVTask implements Callable<Object>
{
- private JobConf _job = null;
- private FileSystem _fs = null;
- private MatrixBlock _src = null;
- private Path _path =null;
- private int _rl = -1;
- private int _ru = -1;
- private CSVFileFormatProperties _props = null;
+ private final JobConf _job;
+ private final FileSystem _fs;
+ private final MatrixBlock _src;
+ private final Path _path;
+ private final int _rl, _ru;
+ private final CSVFileFormatProperties _props;
public WriteCSVTask(Path path, JobConf job, FileSystem fs, MatrixBlock src, int rl, int ru, CSVFileFormatProperties props) {
_path = path;
@@ -120,8 +119,7 @@ public class WriterTextCSVParallel extends WriterTextCSV
}
@Override
- public Object call() throws Exception
- {
+ public Object call() throws Exception {
writeCSVMatrixToFile(_path, _job, _fs, _src, _rl, _ru, _props);
return null;
}
http://git-wip-us.apache.org/repos/asf/systemml/blob/d753af90/src/main/java/org/apache/sysml/runtime/io/WriterTextCell.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/WriterTextCell.java b/src/main/java/org/apache/sysml/runtime/io/WriterTextCell.java
index b0636a3..0438f46 100644
--- a/src/main/java/org/apache/sysml/runtime/io/WriterTextCell.java
+++ b/src/main/java/org/apache/sysml/runtime/io/WriterTextCell.java
@@ -66,7 +66,7 @@ public class WriterTextCell extends MatrixWriter
Path path = new Path( fname );
FileSystem fs = IOUtilFunctions.getFileSystem(path);
try( FSDataOutputStream writer = fs.create(path) ){
- writer.writeBytes("1 1 0");
+ writer.writeBytes(IOUtilFunctions.EMPTY_TEXT_LINE);
}
IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(fs, path);
@@ -126,15 +126,13 @@ public class WriterTextCell extends MatrixWriter
br.write( sb.toString() ); //same as append
sb.setLength(0);
}
-
}
}
}
//handle empty result
- if ( src.isEmptyBlock(false) && rl==0 ) {
- br.write("1 1 0\n");
- }
+ if ( src.isEmptyBlock(false) && rl==0 )
+ br.write(IOUtilFunctions.EMPTY_TEXT_LINE);
}
}
}
http://git-wip-us.apache.org/repos/asf/systemml/blob/d753af90/src/main/java/org/apache/sysml/runtime/matrix/MatrixCharacteristics.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/MatrixCharacteristics.java b/src/main/java/org/apache/sysml/runtime/matrix/MatrixCharacteristics.java
index 21c42cd..6f73de1 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/MatrixCharacteristics.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/MatrixCharacteristics.java
@@ -205,7 +205,8 @@ public class MatrixCharacteristics implements Serializable
public boolean mightHaveEmptyBlocks() {
long singleBlk = Math.max(Math.min(numRows, numRowsPerBlock),1)
* Math.max(Math.min(numColumns, numColumnsPerBlock),1);
- return !nnzKnown() || (nonZero < numRows*numColumns - singleBlk);
+ return !nnzKnown() || numRows==0 || numColumns==0
+ || (nonZero < numRows*numColumns - singleBlk);
}
public static void reorg(MatrixCharacteristics dim, ReorgOperator op,
http://git-wip-us.apache.org/repos/asf/systemml/blob/d753af90/src/main/java/org/apache/sysml/runtime/matrix/WriteCSVMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/WriteCSVMR.java b/src/main/java/org/apache/sysml/runtime/matrix/WriteCSVMR.java
index c4f9a56..f1c9b35 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/WriteCSVMR.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/WriteCSVMR.java
@@ -19,6 +19,7 @@
package org.apache.sysml.runtime.matrix;
+import java.io.IOException;
import java.util.HashMap;
import org.apache.commons.logging.Log;
@@ -61,6 +62,12 @@ public class WriteCSVMR
JobConf job = new JobConf(WriteCSVMR.class);
job.setJobName("WriteCSV-MR");
+ //check for valid output dimensions
+ for( int i=0; i<rlens.length; i++ )
+ if( rlens[i] == 0 || clens[i] == 0 )
+ throw new IOException("Write of matrices with zero"
+ + " rows or columns not supported ("+rlens[i]+"x"+clens[i]+").");
+
byte[] realIndexes=new byte[inputs.length];
for(byte b=0; b<realIndexes.length; b++)
realIndexes[b]=b;
http://git-wip-us.apache.org/repos/asf/systemml/blob/d753af90/src/main/java/org/apache/sysml/runtime/matrix/data/TextToBinaryCellConverter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/TextToBinaryCellConverter.java b/src/main/java/org/apache/sysml/runtime/matrix/data/TextToBinaryCellConverter.java
index 036743f..b51a809 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/TextToBinaryCellConverter.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/TextToBinaryCellConverter.java
@@ -53,14 +53,18 @@ implements Converter<LongWritable, Text, MatrixIndexes, MatrixCell>
hasValue=false;
return;
}
-
+
//reset the tokenizer
st.reset( str );
//convert text to matrix cell
indexes.setIndexes( st.nextLong(), st.nextLong() );
+ if( indexes.getRowIndex() == 0 || indexes.getColumnIndex() == 0 ) {
+ hasValue = false;
+ return;
+ }
value.setValue( st.nextDouble() );
- hasValue = true;
+ hasValue = true;
}
@Override
@@ -72,14 +76,12 @@ implements Converter<LongWritable, Text, MatrixIndexes, MatrixCell>
public Pair<MatrixIndexes, MatrixCell> next() {
if(!hasValue)
return null;
-
hasValue=false;
return pair;
}
@Override
- public void setBlockSize(int rl, int cl)
- {
-
+ public void setBlockSize(int rl, int cl) {
+ //do nothing
}
}
http://git-wip-us.apache.org/repos/asf/systemml/blob/d753af90/src/main/java/org/apache/sysml/runtime/matrix/mapred/ReblockMapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/mapred/ReblockMapper.java b/src/main/java/org/apache/sysml/runtime/matrix/mapred/ReblockMapper.java
index de34f3a..a7ce655 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/mapred/ReblockMapper.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/mapred/ReblockMapper.java
@@ -79,8 +79,7 @@ public class ReblockMapper extends MapperBase
ReblockInstruction[] reblockInstructions = MRJobConfiguration.getReblockInstructions(job);
//get dimension information
- for(ReblockInstruction ins: reblockInstructions)
- {
+ for(ReblockInstruction ins: reblockInstructions) {
dimensionsIn.put(ins.input, MRJobConfiguration.getMatrixCharacteristicsForInput(job, ins.input));
dimensionsOut.put(ins.output, MRJobConfiguration.getMatrixCharactristicsForReblock(job, ins.output));
emptyBlocks.put(ins.output, ins.outputEmptyBlocks);
@@ -90,7 +89,7 @@ public class ReblockMapper extends MapperBase
//(buffer size divided by max reblocks per input matrix, because those are shared in JVM)
int maxlen = 1;
for( ArrayList<ReblockInstruction> rinst : reblock_instructions )
- maxlen = Math.max(maxlen, rinst.size()); //max reblocks per input
+ maxlen = Math.max(maxlen, rinst.size()); //max reblocks per input
buffersize = ReblockBuffer.DEFAULT_BUFFER_SIZE/maxlen;
}
catch (Exception e)
@@ -106,8 +105,7 @@ public class ReblockMapper extends MapperBase
super.close();
//flush buffered data
- for( Entry<Byte,ReblockBuffer> e : buffer.entrySet() )
- {
+ for( Entry<Byte,ReblockBuffer> e : buffer.entrySet() ) {
ReblockBuffer rbuff = e.getValue();
rbuff.flushBuffer(e.getKey(), cachedCollector);
}
@@ -142,13 +140,12 @@ public class ReblockMapper extends MapperBase
//output part of empty blocks (all mappers contribute for better load balance),
//where mapper responsibility is distributed over row blocks
- long numBlocks = (long)Math.ceil((double)rlen/brlen);
+ long numBlocks = (long)Math.ceil((double)Math.max(rlen,1)/brlen);
long len = (long)Math.ceil((double)numBlocks/numMap);
long start = mapID * len * brlen;
- long end = Math.min((mapID+1) * len * brlen, rlen);
+ long end = Math.min((mapID+1) * len * brlen, Math.max(rlen,1));
for(long i=start, r=start/brlen+1; i<end; i+=brlen, r++)
- for(long j=0, c=1; j<clen; j+=bclen, c++)
- {
+ for(long j=0, c=1; j<Math.max(clen,1); j+=bclen, c++) {
tmpIx.setIndexes(r, c);
cachedCollector.collect(tmpIx, tmpVal);
}
http://git-wip-us.apache.org/repos/asf/systemml/blob/d753af90/src/test/java/org/apache/sysml/test/integration/functions/data/WriteReadZeroDimsTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/data/WriteReadZeroDimsTest.java b/src/test/java/org/apache/sysml/test/integration/functions/data/WriteReadZeroDimsTest.java
new file mode 100644
index 0000000..c3a8da4
--- /dev/null
+++ b/src/test/java/org/apache/sysml/test/integration/functions/data/WriteReadZeroDimsTest.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.test.integration.functions.data;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.apache.sysml.api.DMLScript;
+import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
+import org.apache.sysml.lops.LopProperties.ExecType;
+import org.apache.sysml.runtime.matrix.data.MatrixValue.CellIndex;
+import org.apache.sysml.test.integration.AutomatedTestBase;
+import org.apache.sysml.test.integration.TestConfiguration;
+
+public class WriteReadZeroDimsTest extends AutomatedTestBase
+{
+ private final static String TEST_NAME1 = "ZeroDimDataWrite";
+ private final static String TEST_NAME2 = "ZeroDimDataRead";
+ private final static String TEST_DIR = "functions/data/";
+ private final static String TEST_CLASS_DIR = TEST_DIR + WriteReadZeroDimsTest.class.getSimpleName() + "/";
+
+ private final static int rowsM = 1200;
+ private final static int colsM = 1100;
+
+ public enum Type{
+ Zero_Rows,
+ Zero_Cols,
+ }
+
+
+ @Override
+ public void setUp() {
+ addTestConfiguration(TEST_NAME1, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[] { "R1" }) );
+ addTestConfiguration(TEST_NAME2, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME2, new String[] { "R2" }) );
+ }
+
+ @Test
+ public void testZeroRowsTextCP() {
+ runZeroDimsTest(Type.Zero_Rows, "text", ExecType.CP);
+ }
+
+ @Test
+ public void testZeroColsTextCP() {
+ runZeroDimsTest(Type.Zero_Cols, "text", ExecType.CP);
+ }
+
+ @Test
+ public void testZeroRowsMmCP() {
+ runZeroDimsTest(Type.Zero_Rows, "mm", ExecType.CP);
+ }
+
+ @Test
+ public void testZeroColsMmCP() {
+ runZeroDimsTest(Type.Zero_Cols, "mm", ExecType.CP);
+ }
+
+ @Test
+ public void testZeroRowsCsvCP() {
+ runZeroDimsTest(Type.Zero_Rows, "csv", ExecType.CP);
+ }
+
+ @Test
+ public void testZeroColsCsvCP() {
+ runZeroDimsTest(Type.Zero_Cols, "csv", ExecType.CP);
+ }
+
+ @Test
+ public void testZeroRowsBinCP() {
+ runZeroDimsTest(Type.Zero_Rows, "binary", ExecType.CP);
+ }
+
+ @Test
+ public void testZeroColsBinCP() {
+ runZeroDimsTest(Type.Zero_Cols, "binary", ExecType.CP);
+ }
+
+ @Test
+ public void testZeroRowsTextSP() {
+ runZeroDimsTest(Type.Zero_Rows, "text", ExecType.SPARK);
+ }
+
+ @Test
+ public void testZeroColsTextSP() {
+ runZeroDimsTest(Type.Zero_Cols, "text", ExecType.SPARK);
+ }
+
+ @Test
+ public void testZeroRowsMmSP() {
+ runZeroDimsTest(Type.Zero_Rows, "mm", ExecType.SPARK);
+ }
+
+ @Test
+ public void testZeroColsMmSP() {
+ runZeroDimsTest(Type.Zero_Cols, "mm", ExecType.SPARK);
+ }
+
+ @Test
+ public void testZeroRowsCsvSP() {
+ runZeroDimsTest(Type.Zero_Rows, "csv", ExecType.SPARK);
+ }
+
+ @Test
+ public void testZeroColsCsvSP() {
+ runZeroDimsTest(Type.Zero_Cols, "csv", ExecType.SPARK);
+ }
+
+ @Test
+ public void testZeroRowsBinSP() {
+ runZeroDimsTest(Type.Zero_Rows, "binary", ExecType.SPARK);
+ }
+
+ @Test
+ public void testZeroColsBinSP() {
+ runZeroDimsTest(Type.Zero_Cols, "binary", ExecType.SPARK);
+ }
+
+ @Test
+ public void testZeroRowsTextMR() {
+ runZeroDimsTest(Type.Zero_Rows, "text", ExecType.MR);
+ }
+
+ @Test
+ public void testZeroColsTextMR() {
+ runZeroDimsTest(Type.Zero_Cols, "text", ExecType.MR);
+ }
+
+ @Test
+ public void testZeroRowsMmMR() {
+ runZeroDimsTest(Type.Zero_Rows, "mm", ExecType.MR);
+ }
+
+ @Test
+ public void testZeroColsMmMR() {
+ runZeroDimsTest(Type.Zero_Cols, "mm", ExecType.MR);
+ }
+
+ @Test
+ public void testZeroRowsCsvMR() {
+ runZeroDimsTest(Type.Zero_Rows, "csv", ExecType.MR);
+ }
+
+ @Test
+ public void testZeroColsCsvMR() {
+ runZeroDimsTest(Type.Zero_Cols, "csv", ExecType.MR);
+ }
+
+ @Test
+ public void testZeroRowsBinMR() {
+ runZeroDimsTest(Type.Zero_Rows, "binary", ExecType.MR);
+ }
+
+ @Test
+ public void testZeroColsBinMR() {
+ runZeroDimsTest(Type.Zero_Cols, "binary", ExecType.MR);
+ }
+
+ private void runZeroDimsTest( Type type, String format, ExecType et )
+ {
+ int rows = (type == Type.Zero_Rows) ? 0 : rowsM;
+ int cols = (type == Type.Zero_Cols) ? 0 : colsM;
+
+ RUNTIME_PLATFORM platformOld = rtplatform;
+ switch( et ){
+ case MR: rtplatform = RUNTIME_PLATFORM.HADOOP; break;
+ case SPARK: rtplatform = RUNTIME_PLATFORM.SPARK; break;
+ default: rtplatform = RUNTIME_PLATFORM.HYBRID; break;
+ }
+
+ boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
+ if( rtplatform == RUNTIME_PLATFORM.SPARK )
+ DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+
+ try {
+ //run write into format
+ loadTestConfiguration(getTestConfiguration(TEST_NAME1));
+ String HOME = SCRIPT_DIR + TEST_DIR;
+ fullDMLScriptName = HOME + TEST_NAME1 + ".dml";
+ programArgs = new String[]{"-args", String.valueOf(rows),
+ String.valueOf(cols), output("R1"), format};
+ runTest(true, format.equals("csv"), null, -1);
+
+ //run read from format
+ if( !format.equals("csv") ) {
+ loadTestConfiguration(getTestConfiguration(TEST_NAME2));
+ HOME = SCRIPT_DIR + TEST_DIR;
+ fullDMLScriptName = HOME + TEST_NAME2 + ".dml";
+ programArgs = new String[]{"-args", output("R1"), output("R2")};
+ runTest(true, false, null, -1);
+
+ //check overall result
+ double expected = ((type == Type.Zero_Rows) ? colsM : rowsM) * 7;
+ Assert.assertEquals(new Double(expected),
+ readDMLMatrixFromHDFS("R2").get(new CellIndex(1,1)));
+ }
+ }
+ finally {
+ rtplatform = platformOld;
+ DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/systemml/blob/d753af90/src/test/scripts/functions/data/ZeroDimDataRead.dml
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/data/ZeroDimDataRead.dml b/src/test/scripts/functions/data/ZeroDimDataRead.dml
new file mode 100644
index 0000000..7ebb917
--- /dev/null
+++ b/src/test/scripts/functions/data/ZeroDimDataRead.dml
@@ -0,0 +1,29 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+X = read($1);
+if( nrow(X) == 0 )
+ X = rbind(X, matrix(1, 1, ncol(X)));
+else if( ncol(X) == 0 )
+ X = cbind(X, matrix(1, nrow(X), 1));
+R = as.matrix(sum(X * 7));
+
+write(R, $2);
http://git-wip-us.apache.org/repos/asf/systemml/blob/d753af90/src/test/scripts/functions/data/ZeroDimDataWrite.dml
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/data/ZeroDimDataWrite.dml b/src/test/scripts/functions/data/ZeroDimDataWrite.dml
new file mode 100644
index 0000000..21174b3
--- /dev/null
+++ b/src/test/scripts/functions/data/ZeroDimDataWrite.dml
@@ -0,0 +1,23 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+X = matrix(1, $1, $2);
+write(X, $3, format=$4);
http://git-wip-us.apache.org/repos/asf/systemml/blob/d753af90/src/test_suites/java/org/apache/sysml/test/integration/functions/data/ZPackageSuite.java
----------------------------------------------------------------------
diff --git a/src/test_suites/java/org/apache/sysml/test/integration/functions/data/ZPackageSuite.java b/src/test_suites/java/org/apache/sysml/test/integration/functions/data/ZPackageSuite.java
index 7b77e4a..1dfd4fe 100644
--- a/src/test_suites/java/org/apache/sysml/test/integration/functions/data/ZPackageSuite.java
+++ b/src/test_suites/java/org/apache/sysml/test/integration/functions/data/ZPackageSuite.java
@@ -42,6 +42,7 @@ import org.junit.runners.Suite;
SequenceTest.class,
VariableTest.class,
WriteMMTest.class,
+ WriteReadZeroDimsTest.class,
WriteTest.class,
})