You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2016/09/21 00:45:25 UTC
incubator-systemml git commit: [SYSTEMML-946] Fix oom spark
dataframe-matrix/csv-matrix converters
Repository: incubator-systemml
Updated Branches:
refs/heads/master ecf5e1b4c -> 5decbe64b
[SYSTEMML-946] Fix oom spark dataframe-matrix/csv-matrix converters
See details in https://issues.apache.org/jira/browse/SYSTEMML-946.
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/5decbe64
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/5decbe64
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/5decbe64
Branch: refs/heads/master
Commit: 5decbe64b362e29901f15231c672c7b7816ed55a
Parents: ecf5e1b
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Tue Sep 20 17:44:36 2016 -0700
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Tue Sep 20 17:44:36 2016 -0700
----------------------------------------------------------------------
.../spark/utils/RDDConverterUtils.java | 47 +++++++++----
.../DataFrameMatrixConversionTest.java | 73 +++++++++++++-------
2 files changed, 83 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/5decbe64/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
index 3ee1ef8..a619a4d 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
@@ -204,19 +204,19 @@ public class RDDConverterUtils
* @throws DMLRuntimeException
*/
public static JavaPairRDD<MatrixIndexes, MatrixBlock> csvToBinaryBlock(JavaSparkContext sc,
- JavaPairRDD<LongWritable, Text> input, MatrixCharacteristics mcOut,
+ JavaPairRDD<LongWritable, Text> input, MatrixCharacteristics mc,
boolean hasHeader, String delim, boolean fill, double fillValue)
throws DMLRuntimeException
{
//determine unknown dimensions and sparsity if required
- if( !mcOut.dimsKnown(true) ) {
+ if( !mc.dimsKnown(true) ) {
Accumulator<Double> aNnz = sc.accumulator(0L);
JavaRDD<String> tmp = input.values()
.map(new CSVAnalysisFunction(aNnz, delim));
long rlen = tmp.count() - (hasHeader ? 1 : 0);
long clen = tmp.first().split(delim).length;
long nnz = UtilFunctions.toLong(aNnz.value());
- mcOut.set(rlen, clen, mcOut.getRowsPerBlock(), mcOut.getColsPerBlock(), nnz);
+ mc.set(rlen, clen, mc.getRowsPerBlock(), mc.getColsPerBlock(), nnz);
}
//prepare csv w/ row indexes (sorted by filenames)
@@ -224,9 +224,10 @@ public class RDDConverterUtils
.zipWithIndex(); //zip row index
//convert csv rdd to binary block rdd (w/ partial blocks)
+ boolean sparse = requiresSparseAllocation(prepinput, mc);
JavaPairRDD<MatrixIndexes, MatrixBlock> out =
- prepinput.mapPartitionsToPair(
- new CSVToBinaryBlockFunction(mcOut, hasHeader, delim, fill, fillValue));
+ prepinput.mapPartitionsToPair(new CSVToBinaryBlockFunction(
+ mc, sparse, hasHeader, delim, fill, fillValue));
//aggregate partial matrix blocks
out = RDDAggregateUtils.mergeByKey( out );
@@ -298,14 +299,16 @@ public class RDDConverterUtils
mc.setBlockSize(ConfigurationManager.getBlocksize());
}
+ //construct or reuse row ids
JavaPairRDD<Row, Long> prepinput = containsID ?
df.javaRDD().mapToPair(new DataFrameExtractIDFunction()) :
df.javaRDD().zipWithIndex(); //zip row index
//convert csv rdd to binary block rdd (w/ partial blocks)
+ boolean sparse = requiresSparseAllocation(prepinput, mc);
JavaPairRDD<MatrixIndexes, MatrixBlock> out =
prepinput.mapPartitionsToPair(
- new DataFrameToBinaryBlockFunction(mc, containsID, isVector));
+ new DataFrameToBinaryBlockFunction(mc, sparse, containsID, isVector));
//aggregate partial matrix blocks
out = RDDAggregateUtils.mergeByKey( out );
@@ -357,6 +360,28 @@ public class RDDConverterUtils
return in.mapToPair(new TextToSerTextFunction());
}
+ /**
+ *
+ * @param in
+ * @param mc
+ * @return
+ */
+ private static boolean requiresSparseAllocation(JavaPairRDD<?,?> in, MatrixCharacteristics mc) {
+ //if nnz unknown or sparse, pick the robust sparse representation
+ if( !mc.nnzKnown() || (mc.nnzKnown() && MatrixBlock.evalSparseFormatInMemory(
+ mc.getRows(), mc.getCols(), mc.getNonZeros())) ) {
+ return true;
+ }
+
+ //if dense evaluate expected rows per partition to handle wide matrices
+ //(pick sparse representation if fraction of rows per block less than sparse theshold)
+ double datasize = OptimizerUtils.estimatePartitionedSizeExactSparsity(mc);
+ double rowsize = OptimizerUtils.estimatePartitionedSizeExactSparsity(1, mc.getCols(),
+ mc.getNumRowBlocks(), mc.getColsPerBlock(), Math.ceil((double)mc.getNonZeros()/mc.getRows()));
+ double partsize = Math.ceil(datasize/in.partitions().size());
+ double blksz = Math.min(mc.getRows(), mc.getRowsPerBlock());
+ return partsize/rowsize/blksz < MatrixBlock.SPARSITY_TURN_POINT;
+ }
/////////////////////////////////
// BINARYBLOCK-SPECIFIC FUNCTIONS
@@ -633,15 +658,14 @@ public class RDDConverterUtils
private boolean _fill = false;
private double _fillValue = 0;
- public CSVToBinaryBlockFunction(MatrixCharacteristics mc, boolean hasHeader, String delim, boolean fill, double fillValue)
+ public CSVToBinaryBlockFunction(MatrixCharacteristics mc, boolean sparse, boolean hasHeader, String delim, boolean fill, double fillValue)
{
_rlen = mc.getRows();
_clen = mc.getCols();
_brlen = mc.getRowsPerBlock();
_bclen = mc.getColsPerBlock();
_sparsity = OptimizerUtils.getSparsity(mc);
- _sparse = mc.nnzKnown() && MatrixBlock.evalSparseFormatInMemory(mc.getRows(),
- mc.getCols(), mc.getNonZeros()) && (!fill || fillValue==0);
+ _sparse = sparse && (!fill || fillValue==0);
_header = hasHeader;
_delim = delim;
_fill = fill;
@@ -879,13 +903,12 @@ public class RDDConverterUtils
private boolean _containsID;
private boolean _isVector;
- public DataFrameToBinaryBlockFunction(MatrixCharacteristics mc, boolean containsID, boolean isVector) {
+ public DataFrameToBinaryBlockFunction(MatrixCharacteristics mc, boolean sparse, boolean containsID, boolean isVector) {
_rlen = mc.getRows();
_clen = mc.getCols();
_brlen = mc.getRowsPerBlock();
_bclen = mc.getColsPerBlock();
- _sparse = mc.nnzKnown() && MatrixBlock.evalSparseFormatInMemory(
- mc.getRows(), mc.getCols(), mc.getNonZeros());
+ _sparse = sparse;
_containsID = containsID;
_isVector = isVector;
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/5decbe64/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameMatrixConversionTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameMatrixConversionTest.java b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameMatrixConversionTest.java
index e88a867..08511cc 100644
--- a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameMatrixConversionTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameMatrixConversionTest.java
@@ -45,9 +45,11 @@ public class DataFrameMatrixConversionTest extends AutomatedTestBase
private final static String TEST_NAME = "DataFrameConversion";
private final static String TEST_CLASS_DIR = TEST_DIR + DataFrameMatrixConversionTest.class.getSimpleName() + "/";
- private final static int rows1 = 2245;
- private final static int cols1 = 745;
- private final static int cols2 = 1264;
+ private final static int rows1 = 2245;
+ private final static int rows3 = 7;
+ private final static int cols1 = 745;
+ private final static int cols2 = 1264;
+ private final static int cols3 = 1003820;
private final static double sparsity1 = 0.9;
private final static double sparsity2 = 0.1;
private final static double eps=0.0000000001;
@@ -60,82 +62,102 @@ public class DataFrameMatrixConversionTest extends AutomatedTestBase
@Test
public void testVectorConversionSingleDense() {
- testDataFrameConversion(true, true, true, false);
+ testDataFrameConversion(true, cols1, true, false);
}
@Test
public void testVectorConversionSingleDenseUnknown() {
- testDataFrameConversion(true, true, true, true);
+ testDataFrameConversion(true, cols1, true, true);
}
@Test
public void testVectorConversionSingleSparse() {
- testDataFrameConversion(true, true, false, false);
+ testDataFrameConversion(true, cols1, false, false);
}
@Test
public void testVectorConversionSingleSparseUnknown() {
- testDataFrameConversion(true, true, false, true);
+ testDataFrameConversion(true, cols1, false, true);
}
@Test
public void testVectorConversionMultiDense() {
- testDataFrameConversion(true, false, true, false);
+ testDataFrameConversion(true, cols2, true, false);
}
@Test
public void testVectorConversionMultiDenseUnknown() {
- testDataFrameConversion(true, false, true, true);
+ testDataFrameConversion(true, cols2, true, true);
}
@Test
public void testVectorConversionMultiSparse() {
- testDataFrameConversion(true, false, false, false);
+ testDataFrameConversion(true, cols2, false, false);
}
@Test
public void testVectorConversionMultiSparseUnknown() {
- testDataFrameConversion(true, false, false, true);
+ testDataFrameConversion(true, cols2, false, true);
}
@Test
public void testRowConversionSingleDense() {
- testDataFrameConversion(false, true, true, false);
+ testDataFrameConversion(false, cols1, true, false);
}
@Test
public void testRowConversionSingleDenseUnknown() {
- testDataFrameConversion(false, true, true, true);
+ testDataFrameConversion(false, cols1, true, true);
}
@Test
public void testRowConversionSingleSparse() {
- testDataFrameConversion(false, true, false, false);
+ testDataFrameConversion(false, cols1, false, false);
}
@Test
public void testRowConversionSingleSparseUnknown() {
- testDataFrameConversion(false, true, false, true);
+ testDataFrameConversion(false, cols1, false, true);
}
@Test
public void testRowConversionMultiDense() {
- testDataFrameConversion(false, false, true, false);
+ testDataFrameConversion(false, cols2, true, false);
}
@Test
public void testRowConversionMultiDenseUnknown() {
- testDataFrameConversion(false, false, true, true);
+ testDataFrameConversion(false, cols2, true, true);
}
@Test
public void testRowConversionMultiSparse() {
- testDataFrameConversion(false, false, false, false);
+ testDataFrameConversion(false, cols2, false, false);
}
@Test
public void testRowConversionMultiSparseUnknown() {
- testDataFrameConversion(false, false, false, true);
+ testDataFrameConversion(false, cols2, false, true);
+ }
+
+ @Test
+ public void testVectorConversionWideDense() {
+ testDataFrameConversion(true, cols3, true, false);
+ }
+
+ @Test
+ public void testVectorConversionWideDenseUnknown() {
+ testDataFrameConversion(true, cols3, true, true);
+ }
+
+ @Test
+ public void testVectorConversionWideSparse() {
+ testDataFrameConversion(true, cols3, false, false);
+ }
+
+ @Test
+ public void testVectorConversionWideSparseUnknown() {
+ testDataFrameConversion(true, cols3, false, true);
}
/**
@@ -145,7 +167,7 @@ public class DataFrameMatrixConversionTest extends AutomatedTestBase
* @param dense
* @param unknownDims
*/
- private void testDataFrameConversion(boolean vector, boolean singleColBlock, boolean dense, boolean unknownDims) {
+ private void testDataFrameConversion(boolean vector, int cols, boolean dense, boolean unknownDims) {
boolean oldConfig = DMLScript.USE_LOCAL_SPARK_CONFIG;
RUNTIME_PLATFORM oldPlatform = DMLScript.rtplatform;
@@ -157,12 +179,12 @@ public class DataFrameMatrixConversionTest extends AutomatedTestBase
DMLScript.rtplatform = RUNTIME_PLATFORM.HYBRID_SPARK;
//generate input data and setup metadata
- int cols = singleColBlock ? cols1 : cols2;
+ int rows = (cols == cols3) ? rows3 : rows1;
double sparsity = dense ? sparsity1 : sparsity2;
- double[][] A = getRandomMatrix(rows1, cols, -10, 10, sparsity, 2373);
+ double[][] A = getRandomMatrix(rows, cols, -10, 10, sparsity, 2373);
MatrixBlock mbA = DataConverter.convertToMatrixBlock(A);
int blksz = ConfigurationManager.getBlocksize();
- MatrixCharacteristics mc1 = new MatrixCharacteristics(rows1, cols, blksz, blksz, mbA.getNonZeros());
+ MatrixCharacteristics mc1 = new MatrixCharacteristics(rows, cols, blksz, blksz, mbA.getNonZeros());
MatrixCharacteristics mc2 = unknownDims ? new MatrixCharacteristics() : new MatrixCharacteristics(mc1);
//setup spark context
@@ -175,14 +197,15 @@ public class DataFrameMatrixConversionTest extends AutomatedTestBase
//matrix - dataframe - matrix conversion
DataFrame df = RDDConverterUtils.binaryBlockToDataFrame(sqlctx, in, mc1, vector);
+ df = ( rows==rows3 ) ? df.repartition(rows) : df;
JavaPairRDD<MatrixIndexes,MatrixBlock> out = RDDConverterUtils.dataFrameToBinaryBlock(sc, df, mc2, true, vector);
//get output matrix block
- MatrixBlock mbB = SparkExecutionContext.toMatrixBlock(out, rows1, cols, blksz, blksz, -1);
+ MatrixBlock mbB = SparkExecutionContext.toMatrixBlock(out, rows, cols, blksz, blksz, -1);
//compare matrix blocks
double[][] B = DataConverter.convertToDoubleMatrix(mbB);
- TestUtils.compareMatrices(A, B, rows1, cols, eps);
+ TestUtils.compareMatrices(A, B, rows, cols, eps);
}
catch( Exception ex ) {
throw new RuntimeException(ex);