You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2016/09/21 00:45:25 UTC

incubator-systemml git commit: [SYSTEMML-946] Fix oom spark dataframe-matrix/csv-matrix converters

Repository: incubator-systemml
Updated Branches:
  refs/heads/master ecf5e1b4c -> 5decbe64b


[SYSTEMML-946] Fix oom spark dataframe-matrix/csv-matrix converters

See details in https://issues.apache.org/jira/browse/SYSTEMML-946.

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/5decbe64
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/5decbe64
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/5decbe64

Branch: refs/heads/master
Commit: 5decbe64b362e29901f15231c672c7b7816ed55a
Parents: ecf5e1b
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Tue Sep 20 17:44:36 2016 -0700
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Tue Sep 20 17:44:36 2016 -0700

----------------------------------------------------------------------
 .../spark/utils/RDDConverterUtils.java          | 47 +++++++++----
 .../DataFrameMatrixConversionTest.java          | 73 +++++++++++++-------
 2 files changed, 83 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/5decbe64/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
index 3ee1ef8..a619a4d 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
@@ -204,19 +204,19 @@ public class RDDConverterUtils
 	 * @throws DMLRuntimeException
 	 */
 	public static JavaPairRDD<MatrixIndexes, MatrixBlock> csvToBinaryBlock(JavaSparkContext sc,
-			JavaPairRDD<LongWritable, Text> input, MatrixCharacteristics mcOut, 
+			JavaPairRDD<LongWritable, Text> input, MatrixCharacteristics mc, 
 			boolean hasHeader, String delim, boolean fill, double fillValue) 
 		throws DMLRuntimeException 
 	{
 		//determine unknown dimensions and sparsity if required
-		if( !mcOut.dimsKnown(true) ) {
+		if( !mc.dimsKnown(true) ) {
 			Accumulator<Double> aNnz = sc.accumulator(0L);
 			JavaRDD<String> tmp = input.values()
 					.map(new CSVAnalysisFunction(aNnz, delim));
 			long rlen = tmp.count() - (hasHeader ? 1 : 0);
 			long clen = tmp.first().split(delim).length;
 			long nnz = UtilFunctions.toLong(aNnz.value());
-			mcOut.set(rlen, clen, mcOut.getRowsPerBlock(), mcOut.getColsPerBlock(), nnz);
+			mc.set(rlen, clen, mc.getRowsPerBlock(), mc.getColsPerBlock(), nnz);
 		}
 		
 		//prepare csv w/ row indexes (sorted by filenames)
@@ -224,9 +224,10 @@ public class RDDConverterUtils
 				.zipWithIndex(); //zip row index
 		
 		//convert csv rdd to binary block rdd (w/ partial blocks)
+		boolean sparse = requiresSparseAllocation(prepinput, mc);
 		JavaPairRDD<MatrixIndexes, MatrixBlock> out = 
-				prepinput.mapPartitionsToPair(
-					new CSVToBinaryBlockFunction(mcOut, hasHeader, delim, fill, fillValue));
+				prepinput.mapPartitionsToPair(new CSVToBinaryBlockFunction(
+						mc, sparse, hasHeader, delim, fill, fillValue));
 		
 		//aggregate partial matrix blocks
 		out = RDDAggregateUtils.mergeByKey( out ); 
@@ -298,14 +299,16 @@ public class RDDConverterUtils
 			mc.setBlockSize(ConfigurationManager.getBlocksize());
 		}
 		
+		//construct or reuse row ids
 		JavaPairRDD<Row, Long> prepinput = containsID ?
 				df.javaRDD().mapToPair(new DataFrameExtractIDFunction()) :
 				df.javaRDD().zipWithIndex(); //zip row index
 		
 		//convert csv rdd to binary block rdd (w/ partial blocks)
+		boolean sparse = requiresSparseAllocation(prepinput, mc);
 		JavaPairRDD<MatrixIndexes, MatrixBlock> out = 
 				prepinput.mapPartitionsToPair(
-					new DataFrameToBinaryBlockFunction(mc, containsID, isVector));
+					new DataFrameToBinaryBlockFunction(mc, sparse, containsID, isVector));
 		
 		//aggregate partial matrix blocks
 		out = RDDAggregateUtils.mergeByKey( out ); 
@@ -357,6 +360,28 @@ public class RDDConverterUtils
 		return in.mapToPair(new TextToSerTextFunction());
 	}
 
+	/**
+	 * 
+	 * @param in
+	 * @param mc
+	 * @return
+	 */
+	private static boolean requiresSparseAllocation(JavaPairRDD<?,?> in, MatrixCharacteristics mc) {
+		//if nnz unknown or sparse, pick the robust sparse representation
+		if( !mc.nnzKnown() || (mc.nnzKnown() && MatrixBlock.evalSparseFormatInMemory(
+			mc.getRows(), mc.getCols(), mc.getNonZeros())) ) {
+			return true;
+		}
+		
+		//if dense evaluate expected rows per partition to handle wide matrices
+		//(pick sparse representation if fraction of rows per block less than sparse theshold)
+		double datasize = OptimizerUtils.estimatePartitionedSizeExactSparsity(mc);
+		double rowsize = OptimizerUtils.estimatePartitionedSizeExactSparsity(1, mc.getCols(),
+				mc.getNumRowBlocks(), mc.getColsPerBlock(), Math.ceil((double)mc.getNonZeros()/mc.getRows()));
+		double partsize = Math.ceil(datasize/in.partitions().size());
+		double blksz = Math.min(mc.getRows(), mc.getRowsPerBlock());
+		return partsize/rowsize/blksz < MatrixBlock.SPARSITY_TURN_POINT;
+	}
 	
 	/////////////////////////////////
 	// BINARYBLOCK-SPECIFIC FUNCTIONS
@@ -633,15 +658,14 @@ public class RDDConverterUtils
 		private boolean _fill = false;
 		private double _fillValue = 0;
 		
-		public CSVToBinaryBlockFunction(MatrixCharacteristics mc, boolean hasHeader, String delim, boolean fill, double fillValue)
+		public CSVToBinaryBlockFunction(MatrixCharacteristics mc, boolean sparse, boolean hasHeader, String delim, boolean fill, double fillValue)
 		{
 			_rlen = mc.getRows();
 			_clen = mc.getCols();
 			_brlen = mc.getRowsPerBlock();
 			_bclen = mc.getColsPerBlock();
 			_sparsity = OptimizerUtils.getSparsity(mc);
-			_sparse = mc.nnzKnown() && MatrixBlock.evalSparseFormatInMemory(mc.getRows(), 
-					mc.getCols(), mc.getNonZeros()) && (!fill || fillValue==0);
+			_sparse = sparse && (!fill || fillValue==0);
 			_header = hasHeader;
 			_delim = delim;
 			_fill = fill;
@@ -879,13 +903,12 @@ public class RDDConverterUtils
 		private boolean _containsID;
 		private boolean _isVector;
 		
-		public DataFrameToBinaryBlockFunction(MatrixCharacteristics mc, boolean containsID, boolean isVector) {
+		public DataFrameToBinaryBlockFunction(MatrixCharacteristics mc, boolean sparse, boolean containsID, boolean isVector) {
 			_rlen = mc.getRows();
 			_clen = mc.getCols();
 			_brlen = mc.getRowsPerBlock();
 			_bclen = mc.getColsPerBlock();
-			_sparse = mc.nnzKnown() && MatrixBlock.evalSparseFormatInMemory(
-					mc.getRows(), mc.getCols(), mc.getNonZeros());
+			_sparse = sparse;
 			_containsID = containsID;
 			_isVector = isVector;
 		}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/5decbe64/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameMatrixConversionTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameMatrixConversionTest.java b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameMatrixConversionTest.java
index e88a867..08511cc 100644
--- a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameMatrixConversionTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameMatrixConversionTest.java
@@ -45,9 +45,11 @@ public class DataFrameMatrixConversionTest extends AutomatedTestBase
 	private final static String TEST_NAME = "DataFrameConversion";
 	private final static String TEST_CLASS_DIR = TEST_DIR + DataFrameMatrixConversionTest.class.getSimpleName() + "/";
 
-	private final static int  rows1 = 2245;
-	private final static int  cols1 = 745;
-	private final static int  cols2 = 1264;
+	private final static int rows1 = 2245;
+	private final static int rows3 = 7;
+	private final static int cols1 = 745;
+	private final static int cols2 = 1264;
+	private final static int cols3 = 1003820;
 	private final static double sparsity1 = 0.9;
 	private final static double sparsity2 = 0.1;
 	private final static double eps=0.0000000001;
@@ -60,82 +62,102 @@ public class DataFrameMatrixConversionTest extends AutomatedTestBase
 	
 	@Test
 	public void testVectorConversionSingleDense() {
-		testDataFrameConversion(true, true, true, false);
+		testDataFrameConversion(true, cols1, true, false);
 	}
 	
 	@Test
 	public void testVectorConversionSingleDenseUnknown() {
-		testDataFrameConversion(true, true, true, true);
+		testDataFrameConversion(true, cols1, true, true);
 	}
 	
 	@Test
 	public void testVectorConversionSingleSparse() {
-		testDataFrameConversion(true, true, false, false);
+		testDataFrameConversion(true, cols1, false, false);
 	}
 	
 	@Test
 	public void testVectorConversionSingleSparseUnknown() {
-		testDataFrameConversion(true, true, false, true);
+		testDataFrameConversion(true, cols1, false, true);
 	}
 	
 	@Test
 	public void testVectorConversionMultiDense() {
-		testDataFrameConversion(true, false, true, false);
+		testDataFrameConversion(true, cols2, true, false);
 	}
 	
 	@Test
 	public void testVectorConversionMultiDenseUnknown() {
-		testDataFrameConversion(true, false, true, true);
+		testDataFrameConversion(true, cols2, true, true);
 	}
 	
 	@Test
 	public void testVectorConversionMultiSparse() {
-		testDataFrameConversion(true, false, false, false);
+		testDataFrameConversion(true, cols2, false, false);
 	}
 	
 	@Test
 	public void testVectorConversionMultiSparseUnknown() {
-		testDataFrameConversion(true, false, false, true);
+		testDataFrameConversion(true, cols2, false, true);
 	}
 
 	@Test
 	public void testRowConversionSingleDense() {
-		testDataFrameConversion(false, true, true, false);
+		testDataFrameConversion(false, cols1, true, false);
 	}
 	
 	@Test
 	public void testRowConversionSingleDenseUnknown() {
-		testDataFrameConversion(false, true, true, true);
+		testDataFrameConversion(false, cols1, true, true);
 	}
 	
 	@Test
 	public void testRowConversionSingleSparse() {
-		testDataFrameConversion(false, true, false, false);
+		testDataFrameConversion(false, cols1, false, false);
 	}
 	
 	@Test
 	public void testRowConversionSingleSparseUnknown() {
-		testDataFrameConversion(false, true, false, true);
+		testDataFrameConversion(false, cols1, false, true);
 	}
 	
 	@Test
 	public void testRowConversionMultiDense() {
-		testDataFrameConversion(false, false, true, false);
+		testDataFrameConversion(false, cols2, true, false);
 	}
 	
 	@Test
 	public void testRowConversionMultiDenseUnknown() {
-		testDataFrameConversion(false, false, true, true);
+		testDataFrameConversion(false, cols2, true, true);
 	}
 	
 	@Test
 	public void testRowConversionMultiSparse() {
-		testDataFrameConversion(false, false, false, false);
+		testDataFrameConversion(false, cols2, false, false);
 	}
 	
 	@Test
 	public void testRowConversionMultiSparseUnknown() {
-		testDataFrameConversion(false, false, false, true);
+		testDataFrameConversion(false, cols2, false, true);
+	}
+	
+	@Test
+	public void testVectorConversionWideDense() {
+		testDataFrameConversion(true, cols3, true, false);
+	}
+	
+	@Test
+	public void testVectorConversionWideDenseUnknown() {
+		testDataFrameConversion(true, cols3, true, true);
+	}
+	
+	@Test
+	public void testVectorConversionWideSparse() {
+		testDataFrameConversion(true, cols3, false, false);
+	}
+	
+	@Test
+	public void testVectorConversionWideSparseUnknown() {
+		testDataFrameConversion(true, cols3, false, true);
 	}
 	
 	/**
@@ -145,7 +167,7 @@ public class DataFrameMatrixConversionTest extends AutomatedTestBase
 	 * @param dense
 	 * @param unknownDims
 	 */
-	private void testDataFrameConversion(boolean vector, boolean singleColBlock, boolean dense, boolean unknownDims) {
+	private void testDataFrameConversion(boolean vector, int cols, boolean dense, boolean unknownDims) {
 		boolean oldConfig = DMLScript.USE_LOCAL_SPARK_CONFIG; 
 		RUNTIME_PLATFORM oldPlatform = DMLScript.rtplatform;
 
@@ -157,12 +179,12 @@ public class DataFrameMatrixConversionTest extends AutomatedTestBase
 			DMLScript.rtplatform = RUNTIME_PLATFORM.HYBRID_SPARK;
 			
 			//generate input data and setup metadata
-			int cols = singleColBlock ? cols1 : cols2;
+			int rows = (cols == cols3) ? rows3 : rows1;
 			double sparsity = dense ? sparsity1 : sparsity2; 
-			double[][] A = getRandomMatrix(rows1, cols, -10, 10, sparsity, 2373); 
+			double[][] A = getRandomMatrix(rows, cols, -10, 10, sparsity, 2373); 
 			MatrixBlock mbA = DataConverter.convertToMatrixBlock(A); 
 			int blksz = ConfigurationManager.getBlocksize();
-			MatrixCharacteristics mc1 = new MatrixCharacteristics(rows1, cols, blksz, blksz, mbA.getNonZeros());
+			MatrixCharacteristics mc1 = new MatrixCharacteristics(rows, cols, blksz, blksz, mbA.getNonZeros());
 			MatrixCharacteristics mc2 = unknownDims ? new MatrixCharacteristics() : new MatrixCharacteristics(mc1);
 			
 			//setup spark context
@@ -175,14 +197,15 @@ public class DataFrameMatrixConversionTest extends AutomatedTestBase
 			
 			//matrix - dataframe - matrix conversion
 			DataFrame df = RDDConverterUtils.binaryBlockToDataFrame(sqlctx, in, mc1, vector);
+			df = ( rows==rows3 ) ? df.repartition(rows) : df;
 			JavaPairRDD<MatrixIndexes,MatrixBlock> out = RDDConverterUtils.dataFrameToBinaryBlock(sc, df, mc2, true, vector);
 			
 			//get output matrix block
-			MatrixBlock mbB = SparkExecutionContext.toMatrixBlock(out, rows1, cols, blksz, blksz, -1);
+			MatrixBlock mbB = SparkExecutionContext.toMatrixBlock(out, rows, cols, blksz, blksz, -1);
 			
 			//compare matrix blocks
 			double[][] B = DataConverter.convertToDoubleMatrix(mbB);
-			TestUtils.compareMatrices(A, B, rows1, cols, eps);
+			TestUtils.compareMatrices(A, B, rows, cols, eps);
 		}
 		catch( Exception ex ) {
 			throw new RuntimeException(ex);