You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by mb...@apache.org on 2020/11/28 22:48:21 UTC

[systemds] branch master updated (c50165d -> 9641173)

This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git.


    from c50165d  [MINOR] Singleton Federated SSL context
     new 02bd797  [SYSTEMDS-2744] Minor performance improvements sparse operations
     new 9641173  [SYSTEMDS-2550] Fix in-memory reblock for federated matrices/frames

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/sysds/hops/recompile/Recompiler.java    |  46 +++---
 .../apache/sysds/runtime/data/SparseRowVector.java |  12 ++
 .../spark/CSVReblockSPInstruction.java             |   6 +-
 .../instructions/spark/ReblockSPInstruction.java   |   6 +-
 .../apache/sysds/runtime/lineage/LineageItem.java  |   1 +
 .../sysds/runtime/matrix/data/LibMatrixMult.java   | 177 ++++++++++++---------
 .../sysds/runtime/matrix/data/LibMatrixReorg.java  |   5 +-
 .../sysds/runtime/matrix/data/MatrixBlock.java     |   5 +-
 .../test/component/frame/FrameCastingTest.java     |   2 -
 9 files changed, 144 insertions(+), 116 deletions(-)


[systemds] 02/02: [SYSTEMDS-2550] Fix in-memory reblock for federated matrices/frames

Posted by mb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git

commit 9641173dd54ba9d43e4869483006bfc4fc66897c
Author: Matthias Boehm <mb...@gmail.com>
AuthorDate: Sat Nov 28 23:47:53 2020 +0100

    [SYSTEMDS-2550] Fix in-memory reblock for federated matrices/frames
    
    This patch fixes the spark reblock instructions (always compiled in
    hybrid mode), which incorrectly consolidate federated matrices/frames
    into the driver. We now simply extended the implementation to respect
    existing federated data objects.
---
 .../apache/sysds/hops/recompile/Recompiler.java    | 46 +++++++++-------------
 .../spark/CSVReblockSPInstruction.java             |  6 +--
 .../instructions/spark/ReblockSPInstruction.java   |  6 +--
 3 files changed, 23 insertions(+), 35 deletions(-)

diff --git a/src/main/java/org/apache/sysds/hops/recompile/Recompiler.java b/src/main/java/org/apache/sysds/hops/recompile/Recompiler.java
index c785cfc..6e960e7 100644
--- a/src/main/java/org/apache/sysds/hops/recompile/Recompiler.java
+++ b/src/main/java/org/apache/sysds/hops/recompile/Recompiler.java
@@ -68,6 +68,7 @@ import org.apache.sysds.runtime.controlprogram.LocalVariableMap;
 import org.apache.sysds.runtime.controlprogram.ParForProgramBlock;
 import org.apache.sysds.runtime.controlprogram.ProgramBlock;
 import org.apache.sysds.runtime.controlprogram.WhileProgramBlock;
+import org.apache.sysds.runtime.controlprogram.caching.CacheBlock;
 import org.apache.sysds.runtime.controlprogram.caching.CacheableData;
 import org.apache.sysds.runtime.controlprogram.caching.FrameObject;
 import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
@@ -80,7 +81,6 @@ import org.apache.sysds.runtime.instructions.cp.FunctionCallCPInstruction;
 import org.apache.sysds.runtime.instructions.cp.IntObject;
 import org.apache.sysds.runtime.instructions.cp.ScalarObject;
 import org.apache.sysds.runtime.io.IOUtilFunctions;
-import org.apache.sysds.runtime.matrix.data.FrameBlock;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.meta.DataCharacteristics;
 import org.apache.sysds.runtime.meta.MatrixCharacteristics;
@@ -1568,33 +1568,25 @@ public class Recompiler
 			&& !OptimizerUtils.exceedsCachingThreshold(dc.getCols(), OptimizerUtils.estimateSize(dc));
 	}
 	
-	public static void executeInMemoryMatrixReblock(ExecutionContext ec, String varin, String varout) {
-		MatrixObject in = ec.getMatrixObject(varin);
-		MatrixObject out = ec.getMatrixObject(varout);
+	@SuppressWarnings("unchecked")
+	public static void executeInMemoryReblock(ExecutionContext ec, String varin, String varout) {
+		CacheableData<CacheBlock> in = (CacheableData<CacheBlock>) ec.getCacheableData(varin);
+		CacheableData<CacheBlock> out = (CacheableData<CacheBlock>) ec.getCacheableData(varout);
 
-		//read text input matrix (through buffer pool, matrix object carries all relevant
-		//information including additional arguments for csv reblock)
-		MatrixBlock mb = in.acquireRead(); 
-		
-		//set output (incl update matrix characteristics)
-		out.acquireModify( mb );
-		out.release();
-		in.release();
-	}
-	
-	public static void executeInMemoryFrameReblock(ExecutionContext ec, String varin, String varout) 
-	{
-		FrameObject in = ec.getFrameObject(varin);
-		FrameObject out = ec.getFrameObject(varout);
-
-		//read text input frame (through buffer pool, frame object carries all relevant
-		//information including additional arguments for csv reblock)
-		FrameBlock fb = in.acquireRead(); 
-		
-		//set output (incl update matrix characteristics)
-		out.acquireModify( fb );
-		out.release();
-		in.release();
+		if( in.isFederated() ) {
+			out.setMetaData(in.getMetaData());
+			out.setFedMapping(in.getFedMapping());
+		}
+		else {
+			//read text input matrix (through buffer pool, matrix object carries all relevant
+			//information including additional arguments for csv reblock)
+			CacheBlock mb = in.acquireRead();
+			
+			//set output (incl update matrix characteristics)
+			out.acquireModify(mb);
+			out.release();
+			in.release();
+		}
 	}
 	
 	private static void tryReadMetaDataFileDataCharacteristics( DataOp dop )
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/spark/CSVReblockSPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/spark/CSVReblockSPInstruction.java
index d073a3c..be4adc3 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/spark/CSVReblockSPInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/spark/CSVReblockSPInstruction.java
@@ -114,10 +114,8 @@ public class CSVReblockSPInstruction extends UnarySPInstruction {
 
 		//check for in-memory reblock (w/ lazy spark context, potential for latency reduction)
 		if( Recompiler.checkCPReblock(sec, input1.getName()) ) {
-			if( input1.getDataType() == DataType.MATRIX )
-				Recompiler.executeInMemoryMatrixReblock(sec, input1.getName(), output.getName());
-			else if( input1.getDataType() == DataType.FRAME )
-				Recompiler.executeInMemoryFrameReblock(sec, input1.getName(), output.getName());
+			if( input1.getDataType().isMatrix() || input1.getDataType().isFrame() )
+				Recompiler.executeInMemoryReblock(sec, input1.getName(), output.getName());
 			Statistics.decrementNoOfExecutedSPInst();
 			return;
 		}
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/spark/ReblockSPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/spark/ReblockSPInstruction.java
index 46ab52e..a27a760 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/spark/ReblockSPInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/spark/ReblockSPInstruction.java
@@ -97,10 +97,8 @@ public class ReblockSPInstruction extends UnarySPInstruction {
 
 		//check for in-memory reblock (w/ lazy spark context, potential for latency reduction)
 		if( Recompiler.checkCPReblock(sec, input1.getName()) ) {
-			if( input1.getDataType() == DataType.MATRIX )
-				Recompiler.executeInMemoryMatrixReblock(sec, input1.getName(), output.getName());
-			else if( input1.getDataType() == DataType.FRAME )
-				Recompiler.executeInMemoryFrameReblock(sec, input1.getName(), output.getName());
+			if( input1.getDataType().isMatrix() || input1.getDataType().isFrame() )
+				Recompiler.executeInMemoryReblock(sec, input1.getName(), output.getName());
 			Statistics.decrementNoOfExecutedSPInst();
 			return;
 		}


[systemds] 01/02: [SYSTEMDS-2744] Minor performance improvements sparse operations

Posted by mb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git

commit 02bd79742b7d932bbd351fcf78fbfbde9446afa7
Author: Matthias Boehm <mb...@gmail.com>
AuthorDate: Sat Nov 28 22:03:15 2020 +0100

    [SYSTEMDS-2744] Minor performance improvements sparse operations
    
    * Dense-sparse vector transpose
    * Sparse-sparse matrix multiplication with small right hand side
    * Avoid unnecessary overhead for scanning for permutation matmult
---
 .../apache/sysds/runtime/data/SparseRowVector.java |  12 ++
 .../apache/sysds/runtime/lineage/LineageItem.java  |   1 +
 .../sysds/runtime/matrix/data/LibMatrixMult.java   | 177 ++++++++++++---------
 .../sysds/runtime/matrix/data/LibMatrixReorg.java  |   5 +-
 .../sysds/runtime/matrix/data/MatrixBlock.java     |   5 +-
 .../test/component/frame/FrameCastingTest.java     |   2 -
 6 files changed, 121 insertions(+), 81 deletions(-)

diff --git a/src/main/java/org/apache/sysds/runtime/data/SparseRowVector.java b/src/main/java/org/apache/sysds/runtime/data/SparseRowVector.java
index 6d60293..91fab8e 100644
--- a/src/main/java/org/apache/sysds/runtime/data/SparseRowVector.java
+++ b/src/main/java/org/apache/sysds/runtime/data/SparseRowVector.java
@@ -49,6 +49,18 @@ public final class SparseRowVector extends SparseRow implements Serializable
 		indexes = new int[capacity];
 	}
 	
+	public SparseRowVector(int nnz, double[] v, int vlen) {
+		values = new double[nnz];
+		indexes = new int[nnz];
+		for(int i=0, pos=0; i<vlen; i++)
+			if( v[i] != 0 ) {
+				values[pos] = v[i];
+				indexes[pos] = i;
+				pos++;
+			}
+		size = nnz;
+	}
+	
 	public SparseRowVector(int estnnz, int maxnnz) {
 		if( estnnz > initialCapacity )
 			estimatedNzs = estnnz;
diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageItem.java b/src/main/java/org/apache/sysds/runtime/lineage/LineageItem.java
index f0fcad4..2fa3a22 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageItem.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageItem.java
@@ -183,6 +183,7 @@ public class LineageItem {
 		return ret;
 	}
 	
+	@SuppressWarnings("unused")
 	private boolean equalsLI(LineageItem that) {
 		if (isVisited() || this == that)
 			return true;
diff --git a/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixMult.java b/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixMult.java
index f8d5436..a117a8d 100644
--- a/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixMult.java
+++ b/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixMult.java
@@ -1417,89 +1417,116 @@ public class LibMatrixMult
 		int cd = m1.clen;
 		
 		// MATRIX-MATRIX (VV, MV not applicable here because V always dense)
-		if(LOW_LEVEL_OPTIMIZATION)
-		{
-			if( pm2 && m==1 )          //VECTOR-MATRIX
-			{
-				//parallelization over rows in rhs matrix
-				if( !a.isEmpty(0) ) 
-				{
-					int alen = a.size(0);
-					int[] aix = a.indexes(0);
-					double[] avals = a.values(0);
-					double[] cvals = c.valuesAt(0);
-					int rlix = (rl==0) ? 0 : a.posFIndexGTE(0,rl);
-					rlix = (rlix>=0) ? rlix : alen;
-					
-					for( int k=rlix; k<alen && aix[k]<ru; k++ )
-						if( !b.isEmpty(aix[k]) ) {
-							int bpos = b.pos(aix[k]);
-							int blen = b.size(aix[k]);
-							int[] bix = b.indexes(aix[k]);
-							double[] bvals = b.values(aix[k]);
-							vectMultiplyAdd(avals[k], bvals, cvals, bix, bpos, 0, blen);
-						}
-				}
+		if(LOW_LEVEL_OPTIMIZATION) {
+			if( pm2 && m==1 )               //VECTOR-MATRIX
+				matrixMultSparseSparseVM(a, b, c, rl, ru);
+			else if( m2.nonZeros < 2048 )   //MATRIX-SMALL MATRIX
+				matrixMultSparseSparseMMSmallRHS(a, b, c, rl, ru);
+			else                            //MATRIX-MATRIX
+				matrixMultSparseSparseMM(a, b, c, m, cd, m1.nonZeros, rl, ru);
+		}
+		else {
+			matrixMultSparseSparseMMGeneric(a, b, c, rl, ru);
+		}
+	}
+	
+	private static void matrixMultSparseSparseVM(SparseBlock a, SparseBlock b, DenseBlock c, int rl, int ru) {
+		//parallelization over rows in rhs matrix
+		if( a.isEmpty(0) )
+			return;
+		
+		int alen = a.size(0);
+		int[] aix = a.indexes(0);
+		double[] avals = a.values(0);
+		double[] cvals = c.valuesAt(0);
+		int rlix = (rl==0) ? 0 : a.posFIndexGTE(0,rl);
+		rlix = (rlix>=0) ? rlix : alen;
+		
+		for( int k=rlix; k<alen && aix[k]<ru; k++ )
+			if( !b.isEmpty(aix[k]) ) {
+				int bpos = b.pos(aix[k]);
+				int blen = b.size(aix[k]);
+				int[] bix = b.indexes(aix[k]);
+				double[] bvals = b.values(aix[k]);
+				vectMultiplyAdd(avals[k], bvals, cvals, bix, bpos, 0, blen);
 			}
-			else                       //MATRIX-MATRIX
-			{
-				//block sizes for best-effort blocking w/ sufficient row reuse in B yet small overhead
-				final int blocksizeI = 32;
-				final int blocksizeK = Math.max(32, UtilFunctions.nextIntPow2(
-					(int)Math.pow((double)m*cd/m1.nonZeros,2)));
-				
-				//temporary array of current sparse positions
-				int[] curk = new int[Math.min(blocksizeI, ru-rl)];
-				
-				//blocked execution over IK 
-				for( int bi = rl; bi < ru; bi+=blocksizeI ) {
-					Arrays.fill(curk, 0); //reset positions
-					for( int bk = 0, bimin = Math.min(ru, bi+blocksizeI); bk < cd; bk+=blocksizeK ) {
-						final int bkmin = Math.min(cd, bk+blocksizeK); 
-						//core sub block matrix multiplication
-						for( int i=bi; i<bimin; i++ ) {
-							if( a.isEmpty(i) ) continue;
-							final int apos = a.pos(i);
-							final int alen = a.size(i);
-							int[] aix = a.indexes(i);
-							double[] avals = a.values(i);
-							double[] cvals = c.values(i);
-							int cix = c.pos(i);
-							int k = curk[i-bi] + apos;
-							for(; k < apos+alen && aix[k]<bkmin; k++) {
-								if( b.isEmpty(aix[k]) ) continue;
-								vectMultiplyAdd(avals[k], b.values(aix[k]), cvals,
-									b.indexes(aix[k]), b.pos(aix[k]), cix, b.size(aix[k]));
-							}
-							curk[i-bi] = k - apos;
-						}
+	}
+	
+	private static void matrixMultSparseSparseMMSmallRHS(SparseBlock a, SparseBlock b, DenseBlock c, int rl, int ru) {
+		for( int i=rl; i<Math.min(ru, a.numRows()); i++ ) {
+			if( a.isEmpty(i) ) continue;
+			final int apos = a.pos(i);
+			final int alen = a.size(i);
+			int[] aix = a.indexes(i);
+			double[] avals = a.values(i);
+			double[] cvals = c.values(i);
+			int cix = c.pos(i);
+			for(int k = apos; k < apos+alen; k++) {
+				int aixk = aix[k];
+				if( b.isEmpty(aixk) ) continue;
+				vectMultiplyAdd(avals[k], b.values(aixk), cvals,
+					b.indexes(aixk), b.pos(aixk), cix, b.size(aixk));
+			}
+		}
+	}
+	
+	private static void matrixMultSparseSparseMM(SparseBlock a, SparseBlock b, DenseBlock c, int m, int cd, long nnz1, int rl, int ru) {
+		//block sizes for best-effort blocking w/ sufficient row reuse in B yet small overhead
+		final int blocksizeI = 32;
+		final int blocksizeK = Math.max(32,
+			UtilFunctions.nextIntPow2((int)Math.pow((double)m*cd/nnz1, 2)));
+		
+		//temporary array of current sparse positions
+		int[] curk = new int[Math.min(blocksizeI, ru-rl)];
+		
+		//blocked execution over IK 
+		for( int bi = rl; bi < ru; bi+=blocksizeI ) {
+			Arrays.fill(curk, 0); //reset positions
+			for( int bk = 0, bimin = Math.min(ru, bi+blocksizeI); bk < cd; bk+=blocksizeK ) {
+				final int bkmin = Math.min(cd, bk+blocksizeK); 
+				//core sub block matrix multiplication
+				for( int i=bi; i<bimin; i++ ) {
+					if( a.isEmpty(i) ) continue;
+					final int apos = a.pos(i);
+					final int alen = a.size(i);
+					int[] aix = a.indexes(i);
+					double[] avals = a.values(i);
+					double[] cvals = c.values(i);
+					int cix = c.pos(i);
+					int k = curk[i-bi] + apos;
+					for(; k < apos+alen && aix[k]<bkmin; k++) {
+						if( b.isEmpty(aix[k]) ) continue;
+						vectMultiplyAdd(avals[k], b.values(aix[k]), cvals,
+							b.indexes(aix[k]), b.pos(aix[k]), cix, b.size(aix[k]));
 					}
+					curk[i-bi] = k - apos;
 				}
 			}
 		}
-		else {
-			for( int i=rl; i<Math.min(ru, a.numRows()); i++ ) {
-				if( a.isEmpty(i) ) continue;
-				int apos = a.pos(i);
-				int alen = a.size(i);
-				int[] aix = a.indexes(i);
-				double[] avals = a.values(i);
-				double[] cvals = c.values(i);
-				int cix = c.pos(i);
-				for(int k = apos; k < apos+alen; k++) {
-					if( b.isEmpty(aix[k]) ) continue;
-					double val = avals[k];
-					int bpos = b.pos(aix[k]);
-					int blen = b.size(aix[k]);
-					int[] bix = b.indexes(aix[k]);
-					double[] bvals = b.values(aix[k]);
-					for(int j = bpos; j < bpos+blen; j++)
-						cvals[cix+bix[j]] += val * bvals[j];
-				}
+	}
+	
+	private static void matrixMultSparseSparseMMGeneric(SparseBlock a, SparseBlock b, DenseBlock c, int rl, int ru) {
+		for( int i=rl; i<Math.min(ru, a.numRows()); i++ ) {
+			if( a.isEmpty(i) ) continue;
+			int apos = a.pos(i);
+			int alen = a.size(i);
+			int[] aix = a.indexes(i);
+			double[] avals = a.values(i);
+			double[] cvals = c.values(i);
+			int cix = c.pos(i);
+			for(int k = apos; k < apos+alen; k++) {
+				if( b.isEmpty(aix[k]) ) continue;
+				double val = avals[k];
+				int bpos = b.pos(aix[k]);
+				int blen = b.size(aix[k]);
+				int[] bix = b.indexes(aix[k]);
+				double[] bvals = b.values(aix[k]);
+				for(int j = bpos; j < bpos+blen; j++)
+					cvals[cix+bix[j]] += val * bvals[j];
 			}
 		}
 	}
-
+	
 	/**
 	 * This implementation applies to any combination of dense/sparse if at least one
 	 * input is ultrasparse (sparse and very few nnz). In that case, most importantly,
diff --git a/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixReorg.java b/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixReorg.java
index 891dfbe..edc38a7 100644
--- a/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixReorg.java
+++ b/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixReorg.java
@@ -25,6 +25,7 @@ import org.apache.sysds.runtime.data.DenseBlock;
 import org.apache.sysds.runtime.data.DenseBlockFactory;
 import org.apache.sysds.runtime.data.SparseBlock;
 import org.apache.sysds.runtime.data.SparseBlockCSR;
+import org.apache.sysds.runtime.data.SparseRowVector;
 import org.apache.sysds.runtime.functionobjects.DiagIndex;
 import org.apache.sysds.runtime.functionobjects.RevIndex;
 import org.apache.sysds.runtime.functionobjects.SortIndex;
@@ -819,8 +820,8 @@ public class LibMatrixReorg
 		
 		if( out.rlen == 1 ) //VECTOR-VECTOR
 		{
-			c.allocate(0, (int)in.nonZeros); 
-			c.setIndexRange(0, 0, m, a.valuesAt(0), 0, m);
+			//allocate row once by nnz, copy non-zeros
+			c.set(0, new SparseRowVector((int)in.nonZeros, a.valuesAt(0), m), false);
 		}
 		else //general case: MATRIX-MATRIX
 		{
diff --git a/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java b/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java
index e5334fb..4cea827 100644
--- a/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java
@@ -959,7 +959,7 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab
 	}
 	
 	public boolean isSparsePermutationMatrix() {
-		if( !isInSparseFormat() )
+		if( !isInSparseFormat() || nonZeros > rlen )
 			return false;
 		SparseBlock sblock = getSparseBlock();
 		boolean isPM = (sblock != null);
@@ -1040,8 +1040,9 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab
 		boolean sparseDst = evalSparseFormatInMemory(); 
 		
 		//check for empty blocks (e.g., sparse-sparse)
-		if( isEmptyBlock(false) )
+		if( isEmptyBlock(false) ) {
 			cleanupBlock(true, true);
+		}
 		
 		//change representation if required (also done for 
 		//empty blocks in order to set representation flags)
diff --git a/src/test/java/org/apache/sysds/test/component/frame/FrameCastingTest.java b/src/test/java/org/apache/sysds/test/component/frame/FrameCastingTest.java
index 9244fb1..2cf1c4e 100644
--- a/src/test/java/org/apache/sysds/test/component/frame/FrameCastingTest.java
+++ b/src/test/java/org/apache/sysds/test/component/frame/FrameCastingTest.java
@@ -29,8 +29,6 @@ import org.apache.sysds.runtime.util.UtilFunctions;
 import org.apache.sysds.test.AutomatedTestBase;
 import org.apache.sysds.test.TestUtils;
 
-import java.util.Arrays;
-
 public class FrameCastingTest extends AutomatedTestBase
 {
 	private final static int rows = 2891;