You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2018/07/27 22:43:17 UTC

systemml git commit: [SYSTEMML-2291] Performance layered-graph sparsity estimator

Repository: systemml
Updated Branches:
  refs/heads/master e0c271fe4 -> 8a5bdba43


[SYSTEMML-2291] Performance layered-graph sparsity estimator

This patch significantly improves the build and estimate performance of
the sparsity estimator based on layered graphs (by 1000x and 100x,
respectively). In particular, this includes (1) build graph with
array-based node partitions for O(1) access, (2) reduced node sizes, and
(3) proper memoization on propagating r-vectors bottom-up through the
graph.

On a scenario of estimating the output sparsity of a self matrix product
of a 10K x 10K matrix with sparsity 0.01, this patch improved the
build/estimate times from 798s/120s to 87ms/1.1s.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/8a5bdba4
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/8a5bdba4
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/8a5bdba4

Branch: refs/heads/master
Commit: 8a5bdba432350792e0d78aa96825be92596a58cf
Parents: e0c271f
Author: Matthias Boehm <mb...@gmail.com>
Authored: Fri Jul 27 15:44:51 2018 -0700
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Fri Jul 27 15:44:51 2018 -0700

----------------------------------------------------------------------
 .../sysml/hops/estim/EstimatorLayeredGraph.java | 261 +++++++++----------
 .../paramserv/spark/rpc/PSRpcFactory.java       |   2 +-
 2 files changed, 119 insertions(+), 144 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/8a5bdba4/src/main/java/org/apache/sysml/hops/estim/EstimatorLayeredGraph.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/estim/EstimatorLayeredGraph.java b/src/main/java/org/apache/sysml/hops/estim/EstimatorLayeredGraph.java
index 9b25a74..babffe8 100644
--- a/src/main/java/org/apache/sysml/hops/estim/EstimatorLayeredGraph.java
+++ b/src/main/java/org/apache/sysml/hops/estim/EstimatorLayeredGraph.java
@@ -21,11 +21,16 @@ package org.apache.sysml.hops.estim;
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.commons.math3.distribution.ExponentialDistribution;
 import org.apache.commons.math3.random.Well1024a;
+import org.apache.sysml.hops.OptimizerUtils;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
+import org.apache.sysml.runtime.matrix.data.DenseBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.matrix.data.SparseBlock;
+
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.stream.Collectors;
 
 /**
  * This estimator implements an approach based on a so-called layered graph,
@@ -59,159 +64,129 @@ public class EstimatorLayeredGraph extends SparsityEstimator {
 
 	@Override
 	public double estim(MatrixBlock m1, MatrixBlock m2){
-		int layer = 3;
-		LayeredGraph LGraph = new LayeredGraph(m1, m2);
-		//lambda is not the mean, if lambda is 2 hand in 1/2
-		ExponentialDistribution random = new ExponentialDistribution(new Well1024a(), 1);
-		for (int h = 0; h < LGraph.nodes.size(); h++) {
-			if (LGraph.nodes.get(h).getY() == 1) {
-				double[] doubArray = new double[_rounds];
-				for (int g = 0; g < _rounds; g++)
-					doubArray[g] = random.sample();
-				LGraph.nodes.get(h).setVector(doubArray);
-			}
-		}
-		// get r for nodes of upper layer
-		for (int h = 0; h < LGraph.nodes.size(); h++) {
-			if (LGraph.nodes.get(h).getY() == layer) {
-				double[] ret = recr(_rounds, LGraph.nodes.get(h));
-				if(ret != null)
-					LGraph.nodes.get(h).setVector(ret);
-				LGraph.nodes.get(h).setValue(
-					calcNNZ(LGraph.nodes.get(h).getVector(), _rounds));
-			}
-		}
-		//calc final sparsity
-		double nnz = LGraph.nodes.stream().filter(n -> n.getY()==layer)
-			.mapToDouble(n -> n.getValue()).sum();
-		return nnz / m1.getNumRows() / m2.getNumColumns();
+		LayeredGraph graph = new LayeredGraph(m1, m2, _rounds);
+		return OptimizerUtils.getSparsity(m1.getNumRows(),
+			m2.getNumColumns(), graph.estimateNnz());
 	}
-	
-	
-	public double[] recr(int numr, Node tempnode) {
-		if (tempnode.getInput().isEmpty())
-			return (tempnode.getY() == 1) ? tempnode.getVector() : null;
-		else if (tempnode.getInput().size() == 1)
-			return recr(numr, tempnode.getInput().get(0));
-		else {
-			return tempnode.getInput().stream()
-				.map(n -> recr(numr, n)).filter(v -> v != null)
-				.reduce((v1,v2) -> min(v1,v2)).get();
-		}
-	}
-	
-	private double[] min(double[] v1, double[] v2) {
-		double[] ret = new double[v1.length];
-		for(int i=0; i<v1.length; i++)
-			ret[i] = Math.min(v1[i], v2[i]);
-		return ret;
-	}
-
-	public double calcNNZ(double[] inpvec, int numr) {
-		return (inpvec != null && inpvec.length > 0) ?
-			(numr - 1) / Arrays.stream(inpvec).sum() : 0;
-	}
-
-	private class LayeredGraph {
-		List<Node> nodes = new ArrayList<>();
 
-		public LayeredGraph(MatrixBlock m1, MatrixBlock m2) {
-			createNodes(m1, 1, nodes);
-			createNodes(m2, 2, nodes);
+	private static class LayeredGraph {
+		private final List<Node[]> _nodes; //nodes partitioned by graph level
+		private final int _rounds;         //length of propagated r-vectors 
+		
+		public LayeredGraph(MatrixBlock m1, MatrixBlock m2, int r) {
+			_nodes = new ArrayList<>();
+			_rounds = r;
+			buildNext(m1);
+			buildNext(m2);
 		}
-	}
-
-	public void createNodes(MatrixBlock m, int mpos, List<Node> nodes) {
-		if( m.isEmpty() )
-			return;
 		
-		Node nodeout = null;
-		Node nodein = null;
-		//TODO perf: separate handling sparse and dense
-		//TODO perf: hash lookups for existing nodes
-		for (int i = 0; i < m.getNumRows(); i++) {
-			for (int j = 0; j < m.getNumColumns(); j++) {
-				if (m.getValue(i, j) == 0) continue;
-				boolean alreadyExists = false;
-				boolean alreadyExists2 = false;
-				for (int k = 0; k < nodes.size(); k++) {
-					if (nodes.get(k).getX() == i && nodes.get(k).getY() == mpos) {
-						alreadyExists = true;
-					}
-				}
-				if (!alreadyExists) {
-					nodein = new Node(i, mpos);
-					nodes.add(nodein);
-				} else {
-					for (int k = 0; k < nodes.size(); k++) {
-						if (nodes.get(k).getX() == i && nodes.get(k).getY() == mpos) {
-							nodein = nodes.get(k);
-						}
-					}
-				}
-				for (int k = 0; k < nodes.size(); k++) {
-					if (nodes.get(k).getX() == j && nodes.get(k).getY() == mpos + 1) {
-						alreadyExists2 = true;
-					}
+		public void buildNext(MatrixBlock mb) {
+			if( mb.isEmpty() )
+				return;
+			final int m = mb.getNumRows();
+			final int n = mb.getNumColumns();
+			
+			//step 1: create node arrays for rows/cols
+			Node[] rows = null, cols = null;
+			if( _nodes.size() == 0 ) {
+				rows = new Node[m];
+				for(int i=0; i<m; i++)
+					rows[i] = new Node();
+				_nodes.add(rows);
+			}
+			else {
+				rows = _nodes.get(_nodes.size()-1);
+			}
+			cols = new Node[n];
+			for(int j=0; j<n; j++)
+				cols[j] = new Node();
+			_nodes.add(cols);
+			
+			//step 2: create edges for non-zero values
+			if( mb.isInSparseFormat() ) {
+				SparseBlock a = mb.getSparseBlock();
+				for(int i=0; i < m; i++) {
+					if( a.isEmpty(i) ) continue;
+					int apos = a.pos(i);
+					int alen = a.size(i);
+					int[] aix = a.indexes(i);
+					for(int k=apos; k<apos+alen; k++)
+						cols[aix[k]].addInput(rows[i]);
 				}
-				if (!alreadyExists2) {
-					nodeout = new Node(j, mpos + 1);
-					nodes.add(nodeout);
-
-				} else {
-					for (int k = 0; k < nodes.size(); k++) {
-						if (nodes.get(k).getX() == j && nodes.get(k).getY() == mpos + 1) {
-							nodeout = nodes.get(k);
-						}
-					}
+			}
+			else { //dense
+				DenseBlock a = mb.getDenseBlock();
+				for (int i=0; i<m; i++) {
+					double[] avals = a.values(i);
+					int aix = a.pos(i);
+					for (int j=0; j<m; j++)
+						if( avals[aix+j] != 0 )
+							cols[j].addInput(rows[i]);
 				}
-				nodeout.addnz(nodein);
 			}
 		}
-	}
-
-	private static class Node {
-		int xpos;
-		int ypos;
-		double[] r_vector;
-		List<Node> input = new ArrayList<>();
-		double value;
-
-		public Node(int x, int y) {
-			xpos = x;
-			ypos = y;
-		}
-
-		public void setValue(double inp) {
-			value = inp;
-		}
-
-		public double getValue() {
-			return value;
-		}
-
-		public List<Node> getInput() {
-			return input;
-		}
-
-		public int getX() {
-			return xpos;
-		}
-
-		public int getY() {
-			return ypos;
-		}
-
-		public double[] getVector() {
-			return r_vector;
+		
+		public long estimateNnz() {
+			//step 1: assign random vectors ~exp(lambda=1) to all leaf nodes
+			//(lambda is not the mean, if lambda is 2 mean is 1/2)
+			ExponentialDistribution random = new ExponentialDistribution(new Well1024a(), 1);
+			for( Node n : _nodes.get(0) ) {
+				double[] rvect = new double[_rounds];
+				for (int g = 0; g < _rounds; g++)
+					rvect[g] = random.sample();
+				n.setVector(rvect);
+			}
+			
+			//step 2: propagate vectors bottom-up and aggregate nnz
+			return (long) Arrays.stream(_nodes.get(_nodes.size()-1))
+				.mapToDouble(n -> calcNNZ(n.computeVector(_rounds), _rounds)).sum();
 		}
-
-		public void setVector(double[] r_input) {
-			r_vector = r_input;
+		
+		private static double calcNNZ(double[] inpvec, int rounds) {
+			return (inpvec != null && inpvec.length > 0) ?
+				(rounds - 1) / Arrays.stream(inpvec).sum() : 0;
 		}
-
-		public void addnz(Node dest) {
-			input.add(dest);
+		
+		private static class Node {
+			private List<Node> _input = new ArrayList<>();
+			private double[] _rvect;
+			
+			public List<Node> getInput() {
+				return _input;
+			}
+			
+			public double[] getVector() {
+				return _rvect;
+			}
+		
+			public void setVector(double[] rvect) {
+				_rvect = rvect;
+			}
+		
+			public void addInput(Node dest) {
+				_input.add(dest);
+			}
+			
+			private double[] computeVector(int rounds) {
+				if( _rvect != null || getInput().isEmpty() )
+					return _rvect;
+				//recursively compute input vectors
+				List<double[]> ltmp = getInput().stream().map(n -> n.computeVector(rounds))
+					.filter(v -> v!=null).collect(Collectors.toList());
+				if( ltmp.isEmpty() )
+					return null;
+				else if( ltmp.size() == 1 )
+					return _rvect = ltmp.get(0);
+				else {
+					double[] tmp = ltmp.get(0).clone();
+					for(int i=1; i<_input.size(); i++) {
+						double[] v2 = _input.get(i).getVector();
+						for(int j=0; j<rounds; j++)
+							tmp[j] = Math.min(tmp[j], v2[j]);
+					}
+					return _rvect = tmp;
+				}
+			}
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/systemml/blob/8a5bdba4/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/rpc/PSRpcFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/rpc/PSRpcFactory.java b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/rpc/PSRpcFactory.java
index 2d921de..5e76d23 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/rpc/PSRpcFactory.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/rpc/PSRpcFactory.java
@@ -36,7 +36,7 @@ public class PSRpcFactory {
 	private static final String MODULE_NAME = "ps";
 
 	private static TransportContext createTransportContext(SparkConf conf, LocalParamServer ps) {
-		TransportConf tc = SparkTransportConf.fromSparkConf(conf, MODULE_NAME, 0);;
+		TransportConf tc = SparkTransportConf.fromSparkConf(conf, MODULE_NAME, 0);
 		PSRpcHandler handler = new PSRpcHandler(ps);
 		return new TransportContext(tc, handler);
 	}