You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by mb...@apache.org on 2021/07/16 15:15:02 UTC

[systemds] branch master updated: [MINOR] Fix metadata on spark binary write w/ non-default blocksizes

This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/master by this push:
     new 84f2135  [MINOR] Fix metadata on spark binary write w/ non-default blocksizes
84f2135 is described below

commit 84f21358b6ea677ca2406d800a9995e64077f435
Author: Matthias Boehm <mb...@gmail.com>
AuthorDate: Fri Jul 16 17:14:37 2021 +0200

    [MINOR] Fix metadata on spark binary write w/ non-default blocksizes
    
    This patch fixes issues of incorrectly written metadata files (blocksize
    of binary block matrices) when invoked through spark write instructions.
---
 .../instructions/spark/RandSPInstruction.java      |  2 +-
 .../instructions/spark/WriteSPInstruction.java     | 32 +++++++++++-----------
 .../org/apache/sysds/test/AutomatedTestBase.java   |  6 ++++
 .../test/functions/io/binary/BlocksizeTest.java    |  6 ++--
 4 files changed, 26 insertions(+), 20 deletions(-)

diff --git a/src/main/java/org/apache/sysds/runtime/instructions/spark/RandSPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/spark/RandSPInstruction.java
index 421b365..565bf88 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/spark/RandSPInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/spark/RandSPInstruction.java
@@ -754,7 +754,7 @@ public class RandSPInstruction extends UnarySPInstruction {
 		
 		// Construct BinaryBlock representation
 		JavaPairRDD<MatrixIndexes, MatrixBlock> mbRDD = 
-				RDDConverterUtils.binaryCellToBinaryBlock(sec.getSparkContext(), miRDD, mcOut, true);
+			RDDConverterUtils.binaryCellToBinaryBlock(sec.getSparkContext(), miRDD, mcOut, true);
 		
 		//step 5: output handling, incl meta data
 		sec.getDataCharacteristics(output.getName()).set(mcOut);
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/spark/WriteSPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/spark/WriteSPInstruction.java
index 3acab8b..bad0d2f 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/spark/WriteSPInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/spark/WriteSPInstruction.java
@@ -178,9 +178,9 @@ public class WriteSPInstruction extends SPInstruction implements LineageTraceabl
 		//get input rdd
 		JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = sec.getBinaryMatrixBlockRDDHandleForVariable( input1.getName() );
 		DataCharacteristics mc = sec.getDataCharacteristics(input1.getName());
-
-		if( fmt == FileFormat.MM || fmt == FileFormat.TEXT )
-		{
+		DataCharacteristics mcOut = mc; //by reference
+		
+		if( fmt == FileFormat.MM || fmt == FileFormat.TEXT ) {
 			//piggyback nnz maintenance on write
 			LongAccumulator aNnz = null;
 			if( !mc.nnzKnown() ) {
@@ -208,16 +208,14 @@ public class WriteSPInstruction extends SPInstruction implements LineageTraceabl
 			if( !mc.nnzKnown() )
 				mc.setNonZeros( aNnz.value() );
 		}
-		else if( fmt == FileFormat.CSV )
-		{
+		else if( fmt == FileFormat.CSV ) {
 			if( mc.getRows() == 0 || mc.getCols() == 0 ) {
 				throw new IOException("Write of matrices with zero rows or columns"
 					+ " not supported ("+mc.getRows()+"x"+mc.getCols()+").");
 			}
 
-			LongAccumulator aNnz = null;
-
 			//piggyback nnz computation on actual write
+			LongAccumulator aNnz = null;
 			if( !mc.nnzKnown() ) {
 				aNnz = sec.getSparkContext().sc().longAccumulator("nnz");
 				in1 = in1.mapValues(new ComputeBinaryBlockNnzFunction(aNnz));
@@ -234,9 +232,10 @@ public class WriteSPInstruction extends SPInstruction implements LineageTraceabl
 		else if( fmt == FileFormat.BINARY ) {
 			//reblock output if needed
 			int blen = Integer.parseInt(input4.getName());
-			DataCharacteristics mcOut = new MatrixCharacteristics(mc).setBlocksize(blen);
-			if( ConfigurationManager.getBlocksize() != blen )
-				in1 = RDDConverterUtils.binaryBlockToBinaryBlock(in1, mc, mcOut);
+			boolean nonDefaultBlen = ConfigurationManager.getBlocksize() != blen;
+			if( nonDefaultBlen )
+				in1 = RDDConverterUtils.binaryBlockToBinaryBlock(in1, mc,
+					new MatrixCharacteristics(mc).setBlocksize(blen));
 			
 			//piggyback nnz computation on actual write
 			LongAccumulator aNnz = null;
@@ -248,8 +247,10 @@ public class WriteSPInstruction extends SPInstruction implements LineageTraceabl
 			//save binary block rdd on hdfs
 			in1.saveAsHadoopFile(fname, MatrixIndexes.class, MatrixBlock.class, SequenceFileOutputFormat.class);
 
-			if(!mc.nnzKnown())
+			if( !mc.nnzKnown() ) //update nnz
 				mc.setNonZeros(aNnz.value().longValue());
+			if( nonDefaultBlen )
+				mcOut = new MatrixCharacteristics(mc).setBlocksize(blen);
 		}
 		else if(fmt == FileFormat.LIBSVM) {
 			if(mc.getRows() == 0 || mc.getCols() == 0) {
@@ -257,17 +258,16 @@ public class WriteSPInstruction extends SPInstruction implements LineageTraceabl
 					"Write of matrices with zero rows or columns" + " not supported (" + mc.getRows() + "x" + mc
 					.getCols() + ").");
 			}
-
-			LongAccumulator aNnz = null;
-
+			
 			//piggyback nnz computation on actual write
+			LongAccumulator aNnz = null;
 			if(!mc.nnzKnown()) {
 				aNnz = sec.getSparkContext().sc().longAccumulator("nnz");
 				in1 = in1.mapValues(new ComputeBinaryBlockNnzFunction(aNnz));
 			}
 
 			JavaRDD<String> out = RDDConverterUtils.binaryBlockToLibsvm(in1, 
-					mc, (FileFormatPropertiesLIBSVM) formatProperties, true);
+				mc, (FileFormatPropertiesLIBSVM) formatProperties, true);
 
 			customSaveTextFile(out, fname, false);
 
@@ -280,7 +280,7 @@ public class WriteSPInstruction extends SPInstruction implements LineageTraceabl
 		}
 
 		// write meta data file
-		HDFSTool.writeMetaDataFile (fname + ".mtd", ValueType.FP64, mc, fmt, formatProperties);
+		HDFSTool.writeMetaDataFile(fname + ".mtd", ValueType.FP64, mcOut, fmt, formatProperties);
 	}
 
 	protected void processFrameWriteInstruction(SparkExecutionContext sec, String fname, FileFormat fmt, ValueType[] schema)
diff --git a/src/test/java/org/apache/sysds/test/AutomatedTestBase.java b/src/test/java/org/apache/sysds/test/AutomatedTestBase.java
index 51c0d2a..a4da6e3 100644
--- a/src/test/java/org/apache/sysds/test/AutomatedTestBase.java
+++ b/src/test/java/org/apache/sysds/test/AutomatedTestBase.java
@@ -900,9 +900,15 @@ public abstract class AutomatedTestBase {
 	}
 
 	public static void checkDMLMetaDataFile(String fileName, MatrixCharacteristics mc) {
+		checkDMLMetaDataFile(fileName, mc, false);
+	}
+	
+	public static void checkDMLMetaDataFile(String fileName, MatrixCharacteristics mc, boolean checkBlocksize) {
 		MatrixCharacteristics rmc = readDMLMetaDataFile(fileName);
 		Assert.assertEquals(mc.getRows(), rmc.getRows());
 		Assert.assertEquals(mc.getCols(), rmc.getCols());
+		if( checkBlocksize )
+			Assert.assertEquals(mc.getBlocksize(), rmc.getBlocksize());
 	}
 
 	public static MatrixCharacteristics readDMLMetaDataFile(String fileName) {
diff --git a/src/test/java/org/apache/sysds/test/functions/io/binary/BlocksizeTest.java b/src/test/java/org/apache/sysds/test/functions/io/binary/BlocksizeTest.java
index 571f817..8ba7e48 100644
--- a/src/test/java/org/apache/sysds/test/functions/io/binary/BlocksizeTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/io/binary/BlocksizeTest.java
@@ -129,16 +129,16 @@ public class BlocksizeTest extends AutomatedTestBase
 			//generate actual dataset 
 			double[][] X = getRandomMatrix(rows, cols, -1.0, 1.0, sparsity, 7); 
 			MatrixBlock mb = DataConverter.convertToMatrixBlock(X);
-			MatrixCharacteristics mc = new MatrixCharacteristics(rows, cols, inBlksize, inBlksize);
+			MatrixCharacteristics mc = new MatrixCharacteristics(rows, cols, inBlksize);
 			DataConverter.writeMatrixToHDFS(mb, input("X"), FileFormat.BINARY, mc);
 			HDFSTool.writeMetaDataFile(input("X.mtd"), ValueType.FP64, mc, FileFormat.BINARY);
 			
 			runTest(true, false, null, -1); //mult 7
 			
 			//compare matrices 
-			checkDMLMetaDataFile("X", new MatrixCharacteristics(rows, cols, outBlksize, outBlksize));
+			checkDMLMetaDataFile("X", new MatrixCharacteristics(rows, cols, outBlksize), true);
 			MatrixBlock mb2 = DataConverter.readMatrixFromHDFS(
-				output("X"), FileFormat.BINARY, rows, cols, outBlksize, outBlksize);
+				output("X"), FileFormat.BINARY, rows, cols, outBlksize, -1);
 			for( int i=0; i<mb.getNumRows(); i++ )
 				for( int j=0; j<mb.getNumColumns(); j++ ) {
 					double val1 = mb.quickGetValue(i, j) * 7;