You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by mb...@apache.org on 2021/07/16 15:15:02 UTC
[systemds] branch master updated: [MINOR] Fix metadata on spark
binary write w/ non-default blocksizes
This is an automated email from the ASF dual-hosted git repository.
mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/master by this push:
new 84f2135 [MINOR] Fix metadata on spark binary write w/ non-default blocksizes
84f2135 is described below
commit 84f21358b6ea677ca2406d800a9995e64077f435
Author: Matthias Boehm <mb...@gmail.com>
AuthorDate: Fri Jul 16 17:14:37 2021 +0200
[MINOR] Fix metadata on spark binary write w/ non-default blocksizes
This patch fixes issues of incorrectly written metadata files (blocksize
of binary block matrices) when invoked through spark write instructions.
---
.../instructions/spark/RandSPInstruction.java | 2 +-
.../instructions/spark/WriteSPInstruction.java | 32 +++++++++++-----------
.../org/apache/sysds/test/AutomatedTestBase.java | 6 ++++
.../test/functions/io/binary/BlocksizeTest.java | 6 ++--
4 files changed, 26 insertions(+), 20 deletions(-)
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/spark/RandSPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/spark/RandSPInstruction.java
index 421b365..565bf88 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/spark/RandSPInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/spark/RandSPInstruction.java
@@ -754,7 +754,7 @@ public class RandSPInstruction extends UnarySPInstruction {
// Construct BinaryBlock representation
JavaPairRDD<MatrixIndexes, MatrixBlock> mbRDD =
- RDDConverterUtils.binaryCellToBinaryBlock(sec.getSparkContext(), miRDD, mcOut, true);
+ RDDConverterUtils.binaryCellToBinaryBlock(sec.getSparkContext(), miRDD, mcOut, true);
//step 5: output handling, incl meta data
sec.getDataCharacteristics(output.getName()).set(mcOut);
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/spark/WriteSPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/spark/WriteSPInstruction.java
index 3acab8b..bad0d2f 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/spark/WriteSPInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/spark/WriteSPInstruction.java
@@ -178,9 +178,9 @@ public class WriteSPInstruction extends SPInstruction implements LineageTraceabl
//get input rdd
JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = sec.getBinaryMatrixBlockRDDHandleForVariable( input1.getName() );
DataCharacteristics mc = sec.getDataCharacteristics(input1.getName());
-
- if( fmt == FileFormat.MM || fmt == FileFormat.TEXT )
- {
+ DataCharacteristics mcOut = mc; //by reference
+
+ if( fmt == FileFormat.MM || fmt == FileFormat.TEXT ) {
//piggyback nnz maintenance on write
LongAccumulator aNnz = null;
if( !mc.nnzKnown() ) {
@@ -208,16 +208,14 @@ public class WriteSPInstruction extends SPInstruction implements LineageTraceabl
if( !mc.nnzKnown() )
mc.setNonZeros( aNnz.value() );
}
- else if( fmt == FileFormat.CSV )
- {
+ else if( fmt == FileFormat.CSV ) {
if( mc.getRows() == 0 || mc.getCols() == 0 ) {
throw new IOException("Write of matrices with zero rows or columns"
+ " not supported ("+mc.getRows()+"x"+mc.getCols()+").");
}
- LongAccumulator aNnz = null;
-
//piggyback nnz computation on actual write
+ LongAccumulator aNnz = null;
if( !mc.nnzKnown() ) {
aNnz = sec.getSparkContext().sc().longAccumulator("nnz");
in1 = in1.mapValues(new ComputeBinaryBlockNnzFunction(aNnz));
@@ -234,9 +232,10 @@ public class WriteSPInstruction extends SPInstruction implements LineageTraceabl
else if( fmt == FileFormat.BINARY ) {
//reblock output if needed
int blen = Integer.parseInt(input4.getName());
- DataCharacteristics mcOut = new MatrixCharacteristics(mc).setBlocksize(blen);
- if( ConfigurationManager.getBlocksize() != blen )
- in1 = RDDConverterUtils.binaryBlockToBinaryBlock(in1, mc, mcOut);
+ boolean nonDefaultBlen = ConfigurationManager.getBlocksize() != blen;
+ if( nonDefaultBlen )
+ in1 = RDDConverterUtils.binaryBlockToBinaryBlock(in1, mc,
+ new MatrixCharacteristics(mc).setBlocksize(blen));
//piggyback nnz computation on actual write
LongAccumulator aNnz = null;
@@ -248,8 +247,10 @@ public class WriteSPInstruction extends SPInstruction implements LineageTraceabl
//save binary block rdd on hdfs
in1.saveAsHadoopFile(fname, MatrixIndexes.class, MatrixBlock.class, SequenceFileOutputFormat.class);
- if(!mc.nnzKnown())
+ if( !mc.nnzKnown() ) //update nnz
mc.setNonZeros(aNnz.value().longValue());
+ if( nonDefaultBlen )
+ mcOut = new MatrixCharacteristics(mc).setBlocksize(blen);
}
else if(fmt == FileFormat.LIBSVM) {
if(mc.getRows() == 0 || mc.getCols() == 0) {
@@ -257,17 +258,16 @@ public class WriteSPInstruction extends SPInstruction implements LineageTraceabl
"Write of matrices with zero rows or columns" + " not supported (" + mc.getRows() + "x" + mc
.getCols() + ").");
}
-
- LongAccumulator aNnz = null;
-
+
//piggyback nnz computation on actual write
+ LongAccumulator aNnz = null;
if(!mc.nnzKnown()) {
aNnz = sec.getSparkContext().sc().longAccumulator("nnz");
in1 = in1.mapValues(new ComputeBinaryBlockNnzFunction(aNnz));
}
JavaRDD<String> out = RDDConverterUtils.binaryBlockToLibsvm(in1,
- mc, (FileFormatPropertiesLIBSVM) formatProperties, true);
+ mc, (FileFormatPropertiesLIBSVM) formatProperties, true);
customSaveTextFile(out, fname, false);
@@ -280,7 +280,7 @@ public class WriteSPInstruction extends SPInstruction implements LineageTraceabl
}
// write meta data file
- HDFSTool.writeMetaDataFile (fname + ".mtd", ValueType.FP64, mc, fmt, formatProperties);
+ HDFSTool.writeMetaDataFile(fname + ".mtd", ValueType.FP64, mcOut, fmt, formatProperties);
}
protected void processFrameWriteInstruction(SparkExecutionContext sec, String fname, FileFormat fmt, ValueType[] schema)
diff --git a/src/test/java/org/apache/sysds/test/AutomatedTestBase.java b/src/test/java/org/apache/sysds/test/AutomatedTestBase.java
index 51c0d2a..a4da6e3 100644
--- a/src/test/java/org/apache/sysds/test/AutomatedTestBase.java
+++ b/src/test/java/org/apache/sysds/test/AutomatedTestBase.java
@@ -900,9 +900,15 @@ public abstract class AutomatedTestBase {
}
public static void checkDMLMetaDataFile(String fileName, MatrixCharacteristics mc) {
+ checkDMLMetaDataFile(fileName, mc, false);
+ }
+
+ public static void checkDMLMetaDataFile(String fileName, MatrixCharacteristics mc, boolean checkBlocksize) {
MatrixCharacteristics rmc = readDMLMetaDataFile(fileName);
Assert.assertEquals(mc.getRows(), rmc.getRows());
Assert.assertEquals(mc.getCols(), rmc.getCols());
+ if( checkBlocksize )
+ Assert.assertEquals(mc.getBlocksize(), rmc.getBlocksize());
}
public static MatrixCharacteristics readDMLMetaDataFile(String fileName) {
diff --git a/src/test/java/org/apache/sysds/test/functions/io/binary/BlocksizeTest.java b/src/test/java/org/apache/sysds/test/functions/io/binary/BlocksizeTest.java
index 571f817..8ba7e48 100644
--- a/src/test/java/org/apache/sysds/test/functions/io/binary/BlocksizeTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/io/binary/BlocksizeTest.java
@@ -129,16 +129,16 @@ public class BlocksizeTest extends AutomatedTestBase
//generate actual dataset
double[][] X = getRandomMatrix(rows, cols, -1.0, 1.0, sparsity, 7);
MatrixBlock mb = DataConverter.convertToMatrixBlock(X);
- MatrixCharacteristics mc = new MatrixCharacteristics(rows, cols, inBlksize, inBlksize);
+ MatrixCharacteristics mc = new MatrixCharacteristics(rows, cols, inBlksize);
DataConverter.writeMatrixToHDFS(mb, input("X"), FileFormat.BINARY, mc);
HDFSTool.writeMetaDataFile(input("X.mtd"), ValueType.FP64, mc, FileFormat.BINARY);
runTest(true, false, null, -1); //mult 7
//compare matrices
- checkDMLMetaDataFile("X", new MatrixCharacteristics(rows, cols, outBlksize, outBlksize));
+ checkDMLMetaDataFile("X", new MatrixCharacteristics(rows, cols, outBlksize), true);
MatrixBlock mb2 = DataConverter.readMatrixFromHDFS(
- output("X"), FileFormat.BINARY, rows, cols, outBlksize, outBlksize);
+ output("X"), FileFormat.BINARY, rows, cols, outBlksize, -1);
for( int i=0; i<mb.getNumRows(); i++ )
for( int j=0; j<mb.getNumColumns(); j++ ) {
double val1 = mb.quickGetValue(i, j) * 7;