You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by mb...@apache.org on 2021/08/26 14:31:58 UTC

[systemds] branch master updated: [SYSTEMDS-3104] Fix parallel csv matrix reader for large dense blocks

This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/master by this push:
     new f8d104a  [SYSTEMDS-3104] Fix parallel csv matrix reader for large dense blocks
f8d104a is described below

commit f8d104ae6be2965e3c3d21327dd0c536f22f9900
Author: Matthias Boehm <mb...@gmail.com>
AuthorDate: Thu Aug 26 16:30:45 2021 +0200

    [SYSTEMDS-3104] Fix parallel csv matrix reader for large dense blocks
    
    A previous commit a few month ago tried to optimize the performance of
    the parallel csv read, but corrupted the existing support of reading
    large dense blocks >16GB. This patch fixes the issue in a minimally
    invasive manner.
---
 .../apache/sysds/runtime/data/DenseBlockLDRB.java  |  2 +-
 .../sysds/runtime/io/ReaderTextCSVParallel.java    | 87 +++++++++++-----------
 2 files changed, 45 insertions(+), 44 deletions(-)

diff --git a/src/main/java/org/apache/sysds/runtime/data/DenseBlockLDRB.java b/src/main/java/org/apache/sysds/runtime/data/DenseBlockLDRB.java
index e6c8a7b..2da57e0 100644
--- a/src/main/java/org/apache/sysds/runtime/data/DenseBlockLDRB.java
+++ b/src/main/java/org/apache/sysds/runtime/data/DenseBlockLDRB.java
@@ -72,7 +72,7 @@ public abstract class DenseBlockLDRB extends DenseBlock
 			int lastBlockSize = (newBlockSize == rlen ? newBlockSize : rlen % newBlockSize) * odims[0];
 			allocateBlocks(numBlocks);
 			IntStream.range(0, numBlocks)
-					.forEach((i) -> {
+					.forEach(i -> {
 						int length = (i == numBlocks - 1 ? lastBlockSize : newBlockSize *  _odims[0]);
 						allocateBlock(i, length);
 						if (v != 0)
diff --git a/src/main/java/org/apache/sysds/runtime/io/ReaderTextCSVParallel.java b/src/main/java/org/apache/sysds/runtime/io/ReaderTextCSVParallel.java
index 6d879d1..5e8c5e1 100644
--- a/src/main/java/org/apache/sysds/runtime/io/ReaderTextCSVParallel.java
+++ b/src/main/java/org/apache/sysds/runtime/io/ReaderTextCSVParallel.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.sysds.conf.ConfigurationManager;
 import org.apache.sysds.hops.OptimizerUtils;
 import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.data.DenseBlock;
 import org.apache.sysds.runtime.data.SparseBlock;
 import org.apache.sysds.runtime.data.SparseRow;
 import org.apache.sysds.runtime.io.IOUtilFunctions.CountRowsTask;
@@ -93,7 +94,7 @@ public class ReaderTextCSVParallel extends MatrixReader {
 
 		// allocate output matrix block
 		// First Read Pass (count rows/cols, determine offsets, allocate matrix block)
-		MatrixBlock ret = computeCSVSizeAndCreateOutputMatrixBlock(splits, path, rlen, clen, estnnz);
+		MatrixBlock ret = computeCSVSizeAndCreateOutputMatrixBlock(splits, path, rlen, clen, blen, estnnz);
 
 		// Second Read Pass (read, parse strings, append to matrix block)
 		readCSVMatrixFromHDFS(splits, path, ret);
@@ -156,8 +157,8 @@ public class ReaderTextCSVParallel extends MatrixReader {
 		}
 	}
 
-	private MatrixBlock computeCSVSizeAndCreateOutputMatrixBlock(InputSplit[] splits, Path path, long rlen, long clen,
-		long estnnz) throws IOException, DMLRuntimeException {
+	private MatrixBlock computeCSVSizeAndCreateOutputMatrixBlock(InputSplit[] splits,
+		Path path, long rlen, long clen, int blen, long estnnz) throws IOException, DMLRuntimeException {
 		_rLen = 0;
 		_cLen = 0;
 
@@ -225,7 +226,7 @@ public class ReaderTextCSVParallel extends MatrixReader {
 		// allocate target matrix block based on given size;
 		// need to allocate sparse as well since lock-free insert into target
 		long estnnz2 = (estnnz < 0) ? (long) _rLen * _cLen : estnnz;
-		return createOutputMatrixBlock(_rLen, _cLen, _rLen, estnnz2, true, true);
+		return createOutputMatrixBlock(_rLen, _cLen, blen, estnnz2, true, true);
 	}
 
 	private static class SplitOffsetInfos {
@@ -262,8 +263,8 @@ public class ReaderTextCSVParallel extends MatrixReader {
 		protected final boolean _isFirstSplit;
 		protected final int _splitCount;
 
-		protected int row = 0;
-		protected int col = 0;
+		protected int _row = 0;
+		protected int _col = 0;
 
 		public CSVReadTask(InputSplit split, TextInputFormat informat, MatrixBlock dest, int splitCount) {
 			_split = split;
@@ -286,7 +287,7 @@ public class ReaderTextCSVParallel extends MatrixReader {
 					reader.next(key, value);
 				}
 
-				row = _offsets.getOffsetPerSplit(_splitCount);
+				_row = _offsets.getOffsetPerSplit(_splitCount);
 
 				long nnz = 0;
 				try {
@@ -300,8 +301,8 @@ public class ReaderTextCSVParallel extends MatrixReader {
 			}
 			catch(Exception ex) {
 				// post-mortem error handling and bounds checking
-				if(row < 0 || row + 1 > _rLen || col < 0 || col + 1 > _cLen) {
-					String errMsg = "CSV cell [" + (row + 1) + "," + (col + 1) + "] "
+				if(_row < 0 || _row + 1 > _rLen || _col < 0 || _col + 1 > _cLen) {
+					String errMsg = "CSV cell [" + (_row + 1) + "," + (_col + 1) + "] "
 						+ "out of overall matrix range [1:" + _rLen + ",1:" + _cLen + "]. " + ex.getMessage();
 					throw new IOException(errMsg, ex);
 				}
@@ -317,8 +318,8 @@ public class ReaderTextCSVParallel extends MatrixReader {
 			throws IOException;
 
 		protected void verifyRows(Text value) throws IOException {
-			if(row != (_offsets.getOffsetPerSplit(_splitCount) + _offsets.getLenghtPerSplit(_splitCount))) {
-				throw new IOException("Incorrect number of rows (" + row + ") found in delimited file ("
+			if(_row != (_offsets.getOffsetPerSplit(_splitCount) + _offsets.getLenghtPerSplit(_splitCount))) {
+				throw new IOException("Incorrect number of rows (" + _row + ") found in delimited file ("
 					+ (_offsets.getOffsetPerSplit(_splitCount) + _offsets.getLenghtPerSplit(_splitCount)) + "): "
 					+ value);
 			}
@@ -332,17 +333,18 @@ public class ReaderTextCSVParallel extends MatrixReader {
 		}
 
 		protected long parse(RecordReader<LongWritable, Text> reader, LongWritable key, Text value) throws IOException {
-			double[] a = _dest.getDenseBlockValues();
+			DenseBlock a = _dest.getDenseBlock();
 			double cellValue = 0;
 			long nnz = 0;
 			boolean noFillEmpty = false;
-			int index = row * (int) _cLen;
 
 			while(reader.next(key, value)) { // foreach line
 				final String cellStr = value.toString().trim();
 				final String[] parts = IOUtilFunctions.split(cellStr, _props.getDelim());
-				for(String part : parts) { // foreach cell
-					part = part.trim();
+				double[] avals = a.values(_row);
+				int apos = a.pos(_row);
+				for(int j = 0; j < _cLen; j++) { // foreach cell
+					String part = parts[j].trim();
 					if(part.isEmpty()) {
 						noFillEmpty |= !_props.isFill();
 						cellValue = _props.getFillValue();
@@ -351,15 +353,14 @@ public class ReaderTextCSVParallel extends MatrixReader {
 						cellValue = Double.parseDouble(part);
 					}
 					if(cellValue != 0) {
-						a[index] = cellValue;
+						avals[apos+j] = cellValue;
 						nnz++;
 					}
-					index++;
 				}
 				// sanity checks (number of columns, fill values)
 				IOUtilFunctions.checkAndRaiseErrorCSVEmptyField(cellStr, _props.isFill(), noFillEmpty);
 				IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(_split.toString(), cellStr, parts, _cLen);
-				row++;
+				_row++;
 			}
 
 			return nnz;
@@ -374,16 +375,17 @@ public class ReaderTextCSVParallel extends MatrixReader {
 		}
 
 		protected long parse(RecordReader<LongWritable, Text> reader, LongWritable key, Text value) throws IOException {
-			double[] a = _dest.getDenseBlockValues();
+			DenseBlock a = _dest.getDenseBlock();
 			double cellValue = 0;
 			boolean noFillEmpty = false;
-			int index = row * (int) _cLen;
 			long nnz = 0;
 			while(reader.next(key, value)) { // foreach line
 				String cellStr = value.toString().trim();
 				String[] parts = IOUtilFunctions.split(cellStr, _props.getDelim());
-				for(String part : parts) { // foreach cell
-					part = part.trim();
+				double[] avals = a.values(_row);
+				int apos = a.pos(_row);
+				for(int j = 0; j < _cLen; j++) { // foreach cell
+					String part = parts[j].trim();
 					if(part.isEmpty()) {
 						noFillEmpty |= !_props.isFill();
 						cellValue = _props.getFillValue();
@@ -392,15 +394,14 @@ public class ReaderTextCSVParallel extends MatrixReader {
 						cellValue = UtilFunctions.parseToDouble(part, _props.getNAStrings());
 
 					if(cellValue != 0) {
-						a[index] = cellValue;
+						avals[apos+j] = cellValue;
 						nnz++;
 					}
-					index++;
 				}
 				// sanity checks (number of columns, fill values)
 				IOUtilFunctions.checkAndRaiseErrorCSVEmptyField(cellStr, _props.isFill(), noFillEmpty);
 				IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(_split.toString(), cellStr, parts, _cLen);
-				row++;
+				_row++;
 			}
 			return nnz;
 		}
@@ -421,9 +422,9 @@ public class ReaderTextCSVParallel extends MatrixReader {
 
 				final String cellStr = value.toString().trim();
 				final String[] parts = IOUtilFunctions.split(cellStr, _props.getDelim());
-				col = 0;
-				sb.allocate(row);
-				SparseRow r = sb.get(row);
+				_col = 0;
+				sb.allocate(_row);
+				SparseRow r = sb.get(_row);
 
 				for(String part : parts) {
 					part = part.trim();
@@ -436,17 +437,17 @@ public class ReaderTextCSVParallel extends MatrixReader {
 					}
 
 					if(cellValue != 0) {
-						r.append(col, cellValue);
+						r.append(_col, cellValue);
 						nnz++;
 					}
-					col++;
+					_col++;
 				}
 
 				// sanity checks (number of columns, fill values)
 				IOUtilFunctions.checkAndRaiseErrorCSVEmptyField(cellStr, _props.isFill(), noFillEmpty);
 				IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(_split.toString(), cellStr, parts, _cLen);
 
-				row++;
+				_row++;
 			}
 			return nnz;
 		}
@@ -463,11 +464,11 @@ public class ReaderTextCSVParallel extends MatrixReader {
 			double cellValue = 0;
 			boolean noFillEmpty = false;
 			while(reader.next(key, value)) {
-				col = 0;
+				_col = 0;
 				final String cellStr = value.toString().trim();
 				final String[] parts = IOUtilFunctions.split(cellStr, _props.getDelim());
-				sb.allocate(row);
-				SparseRow r = sb.get(row);
+				sb.allocate(_row);
+				SparseRow r = sb.get(_row);
 				for(String part : parts) {
 					part = part.trim();
 					if(part.isEmpty()) {
@@ -479,17 +480,17 @@ public class ReaderTextCSVParallel extends MatrixReader {
 					}
 
 					if(cellValue != 0) {
-						r.append(col, cellValue);
+						r.append(_col, cellValue);
 						nnz++;
 					}
-					col++;
+					_col++;
 				}
 
 				// sanity checks (number of columns, fill values)
 				IOUtilFunctions.checkAndRaiseErrorCSVEmptyField(cellStr, _props.isFill(), noFillEmpty);
 				IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(_split.toString(), cellStr, parts, _cLen);
 
-				row++;
+				_row++;
 			}
 			return nnz;
 		}
@@ -506,25 +507,25 @@ public class ReaderTextCSVParallel extends MatrixReader {
 			long nnz = 0;
 			double cellValue = 0;
 			while(reader.next(key, value)) {
-				col = 0;
+				_col = 0;
 				final String cellStr = value.toString().trim();
 				final String[] parts = IOUtilFunctions.split(cellStr, _props.getDelim());
-				sb.allocate(row);
-				SparseRow r = sb.get(row);
+				sb.allocate(_row);
+				SparseRow r = sb.get(_row);
 				for(String part : parts) {
 					if(!part.isEmpty()) {
 						cellValue = Double.parseDouble(part);
 						if(cellValue != 0) {
-							r.append(col, cellValue);
+							r.append(_col, cellValue);
 							nnz++;
 						}
 					}
-					col++;
+					_col++;
 				}
 
 				IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(_split.toString(), cellStr, parts, _cLen);
 
-				row++;
+				_row++;
 			}
 			return nnz;
 		}