You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by mb...@apache.org on 2021/08/26 14:31:58 UTC
[systemds] branch master updated: [SYSTEMDS-3104] Fix parallel csv
matrix reader for large dense blocks
This is an automated email from the ASF dual-hosted git repository.
mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/master by this push:
new f8d104a [SYSTEMDS-3104] Fix parallel csv matrix reader for large dense blocks
f8d104a is described below
commit f8d104ae6be2965e3c3d21327dd0c536f22f9900
Author: Matthias Boehm <mb...@gmail.com>
AuthorDate: Thu Aug 26 16:30:45 2021 +0200
[SYSTEMDS-3104] Fix parallel csv matrix reader for large dense blocks
A previous commit a few month ago tried to optimize the performance of
the parallel csv read, but corrupted the existing support of reading
large dense blocks >16GB. This patch fixes the issue in a minimally
invasive manner.
---
.../apache/sysds/runtime/data/DenseBlockLDRB.java | 2 +-
.../sysds/runtime/io/ReaderTextCSVParallel.java | 87 +++++++++++-----------
2 files changed, 45 insertions(+), 44 deletions(-)
diff --git a/src/main/java/org/apache/sysds/runtime/data/DenseBlockLDRB.java b/src/main/java/org/apache/sysds/runtime/data/DenseBlockLDRB.java
index e6c8a7b..2da57e0 100644
--- a/src/main/java/org/apache/sysds/runtime/data/DenseBlockLDRB.java
+++ b/src/main/java/org/apache/sysds/runtime/data/DenseBlockLDRB.java
@@ -72,7 +72,7 @@ public abstract class DenseBlockLDRB extends DenseBlock
int lastBlockSize = (newBlockSize == rlen ? newBlockSize : rlen % newBlockSize) * odims[0];
allocateBlocks(numBlocks);
IntStream.range(0, numBlocks)
- .forEach((i) -> {
+ .forEach(i -> {
int length = (i == numBlocks - 1 ? lastBlockSize : newBlockSize * _odims[0]);
allocateBlock(i, length);
if (v != 0)
diff --git a/src/main/java/org/apache/sysds/runtime/io/ReaderTextCSVParallel.java b/src/main/java/org/apache/sysds/runtime/io/ReaderTextCSVParallel.java
index 6d879d1..5e8c5e1 100644
--- a/src/main/java/org/apache/sysds/runtime/io/ReaderTextCSVParallel.java
+++ b/src/main/java/org/apache/sysds/runtime/io/ReaderTextCSVParallel.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.sysds.conf.ConfigurationManager;
import org.apache.sysds.hops.OptimizerUtils;
import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.data.DenseBlock;
import org.apache.sysds.runtime.data.SparseBlock;
import org.apache.sysds.runtime.data.SparseRow;
import org.apache.sysds.runtime.io.IOUtilFunctions.CountRowsTask;
@@ -93,7 +94,7 @@ public class ReaderTextCSVParallel extends MatrixReader {
// allocate output matrix block
// First Read Pass (count rows/cols, determine offsets, allocate matrix block)
- MatrixBlock ret = computeCSVSizeAndCreateOutputMatrixBlock(splits, path, rlen, clen, estnnz);
+ MatrixBlock ret = computeCSVSizeAndCreateOutputMatrixBlock(splits, path, rlen, clen, blen, estnnz);
// Second Read Pass (read, parse strings, append to matrix block)
readCSVMatrixFromHDFS(splits, path, ret);
@@ -156,8 +157,8 @@ public class ReaderTextCSVParallel extends MatrixReader {
}
}
- private MatrixBlock computeCSVSizeAndCreateOutputMatrixBlock(InputSplit[] splits, Path path, long rlen, long clen,
- long estnnz) throws IOException, DMLRuntimeException {
+ private MatrixBlock computeCSVSizeAndCreateOutputMatrixBlock(InputSplit[] splits,
+ Path path, long rlen, long clen, int blen, long estnnz) throws IOException, DMLRuntimeException {
_rLen = 0;
_cLen = 0;
@@ -225,7 +226,7 @@ public class ReaderTextCSVParallel extends MatrixReader {
// allocate target matrix block based on given size;
// need to allocate sparse as well since lock-free insert into target
long estnnz2 = (estnnz < 0) ? (long) _rLen * _cLen : estnnz;
- return createOutputMatrixBlock(_rLen, _cLen, _rLen, estnnz2, true, true);
+ return createOutputMatrixBlock(_rLen, _cLen, blen, estnnz2, true, true);
}
private static class SplitOffsetInfos {
@@ -262,8 +263,8 @@ public class ReaderTextCSVParallel extends MatrixReader {
protected final boolean _isFirstSplit;
protected final int _splitCount;
- protected int row = 0;
- protected int col = 0;
+ protected int _row = 0;
+ protected int _col = 0;
public CSVReadTask(InputSplit split, TextInputFormat informat, MatrixBlock dest, int splitCount) {
_split = split;
@@ -286,7 +287,7 @@ public class ReaderTextCSVParallel extends MatrixReader {
reader.next(key, value);
}
- row = _offsets.getOffsetPerSplit(_splitCount);
+ _row = _offsets.getOffsetPerSplit(_splitCount);
long nnz = 0;
try {
@@ -300,8 +301,8 @@ public class ReaderTextCSVParallel extends MatrixReader {
}
catch(Exception ex) {
// post-mortem error handling and bounds checking
- if(row < 0 || row + 1 > _rLen || col < 0 || col + 1 > _cLen) {
- String errMsg = "CSV cell [" + (row + 1) + "," + (col + 1) + "] "
+ if(_row < 0 || _row + 1 > _rLen || _col < 0 || _col + 1 > _cLen) {
+ String errMsg = "CSV cell [" + (_row + 1) + "," + (_col + 1) + "] "
+ "out of overall matrix range [1:" + _rLen + ",1:" + _cLen + "]. " + ex.getMessage();
throw new IOException(errMsg, ex);
}
@@ -317,8 +318,8 @@ public class ReaderTextCSVParallel extends MatrixReader {
throws IOException;
protected void verifyRows(Text value) throws IOException {
- if(row != (_offsets.getOffsetPerSplit(_splitCount) + _offsets.getLenghtPerSplit(_splitCount))) {
- throw new IOException("Incorrect number of rows (" + row + ") found in delimited file ("
+ if(_row != (_offsets.getOffsetPerSplit(_splitCount) + _offsets.getLenghtPerSplit(_splitCount))) {
+ throw new IOException("Incorrect number of rows (" + _row + ") found in delimited file ("
+ (_offsets.getOffsetPerSplit(_splitCount) + _offsets.getLenghtPerSplit(_splitCount)) + "): "
+ value);
}
@@ -332,17 +333,18 @@ public class ReaderTextCSVParallel extends MatrixReader {
}
protected long parse(RecordReader<LongWritable, Text> reader, LongWritable key, Text value) throws IOException {
- double[] a = _dest.getDenseBlockValues();
+ DenseBlock a = _dest.getDenseBlock();
double cellValue = 0;
long nnz = 0;
boolean noFillEmpty = false;
- int index = row * (int) _cLen;
while(reader.next(key, value)) { // foreach line
final String cellStr = value.toString().trim();
final String[] parts = IOUtilFunctions.split(cellStr, _props.getDelim());
- for(String part : parts) { // foreach cell
- part = part.trim();
+ double[] avals = a.values(_row);
+ int apos = a.pos(_row);
+ for(int j = 0; j < _cLen; j++) { // foreach cell
+ String part = parts[j].trim();
if(part.isEmpty()) {
noFillEmpty |= !_props.isFill();
cellValue = _props.getFillValue();
@@ -351,15 +353,14 @@ public class ReaderTextCSVParallel extends MatrixReader {
cellValue = Double.parseDouble(part);
}
if(cellValue != 0) {
- a[index] = cellValue;
+ avals[apos+j] = cellValue;
nnz++;
}
- index++;
}
// sanity checks (number of columns, fill values)
IOUtilFunctions.checkAndRaiseErrorCSVEmptyField(cellStr, _props.isFill(), noFillEmpty);
IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(_split.toString(), cellStr, parts, _cLen);
- row++;
+ _row++;
}
return nnz;
@@ -374,16 +375,17 @@ public class ReaderTextCSVParallel extends MatrixReader {
}
protected long parse(RecordReader<LongWritable, Text> reader, LongWritable key, Text value) throws IOException {
- double[] a = _dest.getDenseBlockValues();
+ DenseBlock a = _dest.getDenseBlock();
double cellValue = 0;
boolean noFillEmpty = false;
- int index = row * (int) _cLen;
long nnz = 0;
while(reader.next(key, value)) { // foreach line
String cellStr = value.toString().trim();
String[] parts = IOUtilFunctions.split(cellStr, _props.getDelim());
- for(String part : parts) { // foreach cell
- part = part.trim();
+ double[] avals = a.values(_row);
+ int apos = a.pos(_row);
+ for(int j = 0; j < _cLen; j++) { // foreach cell
+ String part = parts[j].trim();
if(part.isEmpty()) {
noFillEmpty |= !_props.isFill();
cellValue = _props.getFillValue();
@@ -392,15 +394,14 @@ public class ReaderTextCSVParallel extends MatrixReader {
cellValue = UtilFunctions.parseToDouble(part, _props.getNAStrings());
if(cellValue != 0) {
- a[index] = cellValue;
+ avals[apos+j] = cellValue;
nnz++;
}
- index++;
}
// sanity checks (number of columns, fill values)
IOUtilFunctions.checkAndRaiseErrorCSVEmptyField(cellStr, _props.isFill(), noFillEmpty);
IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(_split.toString(), cellStr, parts, _cLen);
- row++;
+ _row++;
}
return nnz;
}
@@ -421,9 +422,9 @@ public class ReaderTextCSVParallel extends MatrixReader {
final String cellStr = value.toString().trim();
final String[] parts = IOUtilFunctions.split(cellStr, _props.getDelim());
- col = 0;
- sb.allocate(row);
- SparseRow r = sb.get(row);
+ _col = 0;
+ sb.allocate(_row);
+ SparseRow r = sb.get(_row);
for(String part : parts) {
part = part.trim();
@@ -436,17 +437,17 @@ public class ReaderTextCSVParallel extends MatrixReader {
}
if(cellValue != 0) {
- r.append(col, cellValue);
+ r.append(_col, cellValue);
nnz++;
}
- col++;
+ _col++;
}
// sanity checks (number of columns, fill values)
IOUtilFunctions.checkAndRaiseErrorCSVEmptyField(cellStr, _props.isFill(), noFillEmpty);
IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(_split.toString(), cellStr, parts, _cLen);
- row++;
+ _row++;
}
return nnz;
}
@@ -463,11 +464,11 @@ public class ReaderTextCSVParallel extends MatrixReader {
double cellValue = 0;
boolean noFillEmpty = false;
while(reader.next(key, value)) {
- col = 0;
+ _col = 0;
final String cellStr = value.toString().trim();
final String[] parts = IOUtilFunctions.split(cellStr, _props.getDelim());
- sb.allocate(row);
- SparseRow r = sb.get(row);
+ sb.allocate(_row);
+ SparseRow r = sb.get(_row);
for(String part : parts) {
part = part.trim();
if(part.isEmpty()) {
@@ -479,17 +480,17 @@ public class ReaderTextCSVParallel extends MatrixReader {
}
if(cellValue != 0) {
- r.append(col, cellValue);
+ r.append(_col, cellValue);
nnz++;
}
- col++;
+ _col++;
}
// sanity checks (number of columns, fill values)
IOUtilFunctions.checkAndRaiseErrorCSVEmptyField(cellStr, _props.isFill(), noFillEmpty);
IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(_split.toString(), cellStr, parts, _cLen);
- row++;
+ _row++;
}
return nnz;
}
@@ -506,25 +507,25 @@ public class ReaderTextCSVParallel extends MatrixReader {
long nnz = 0;
double cellValue = 0;
while(reader.next(key, value)) {
- col = 0;
+ _col = 0;
final String cellStr = value.toString().trim();
final String[] parts = IOUtilFunctions.split(cellStr, _props.getDelim());
- sb.allocate(row);
- SparseRow r = sb.get(row);
+ sb.allocate(_row);
+ SparseRow r = sb.get(_row);
for(String part : parts) {
if(!part.isEmpty()) {
cellValue = Double.parseDouble(part);
if(cellValue != 0) {
- r.append(col, cellValue);
+ r.append(_col, cellValue);
nnz++;
}
}
- col++;
+ _col++;
}
IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(_split.toString(), cellStr, parts, _cLen);
- row++;
+ _row++;
}
return nnz;
}