You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by ar...@apache.org on 2022/02/03 19:53:52 UTC

[systemds] branch main updated: [SYSTEMDS-3289] Multithreaded equi-height binning

This is an automated email from the ASF dual-hosted git repository.

arnabp20 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new fb5126a  [SYSTEMDS-3289] Multithreaded equi-height binning
fb5126a is described below

commit fb5126a21ac249f04309f50c8ecab218c7104781
Author: arnabp <ar...@tugraz.at>
AuthorDate: Thu Feb 3 20:44:08 2022 +0100

    [SYSTEMDS-3289] Multithreaded equi-height binning
    
    This patch adds multithreaded support to equi-height binning
    in transformencode/apply. We use partition-sorting and a
    heap-based merging of sorted blocks.
    
    Closes #1495.
---
 ...ltiReturnParameterizedBuiltinSPInstruction.java |  24 ++--
 .../runtime/transform/encode/ColumnEncoderBin.java | 137 ++++++++++++++++-----
 .../TransformFrameBuildMultithreadedTest.java      |  28 +++--
 3 files changed, 144 insertions(+), 45 deletions(-)

diff --git a/src/main/java/org/apache/sysds/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
index 9c8c08b..bca0825 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
@@ -315,16 +315,22 @@ public class MultiReturnParameterizedBuiltinSPInstruction extends ComputationSPI
 			}
 			// handle bin boundaries
 			else if(_encoder.containsEncoderForID(colID, ColumnEncoderBin.class)) {
-				double min = Double.MAX_VALUE;
-				double max = -Double.MAX_VALUE;
-				while(iter.hasNext()) {
-					double value = Double.parseDouble(iter.next().toString());
-					min = Math.min(min, value);
-					max = Math.max(max, value);
-				}
 				ColumnEncoderBin baEncoder = _encoder.getColumnEncoder(colID, ColumnEncoderBin.class);
-				assert baEncoder != null;
-				baEncoder.computeBins(min, max);
+				if (baEncoder.getBinMethod() == ColumnEncoderBin.BinMethod.EQUI_WIDTH) {
+					double min = Double.MAX_VALUE;
+					double max = -Double.MAX_VALUE;
+					while(iter.hasNext()) {
+						double value = Double.parseDouble(iter.next().toString());
+						min = Math.min(min, value);
+						max = Math.max(max, value);
+					}
+					assert baEncoder != null;
+					baEncoder.computeBins(min, max);
+				}
+				else //TODO: support equi-height
+					throw new DMLRuntimeException("Binning method "+baEncoder.getBinMethod().toString()
+						+" is not support for Spark");
+
 				double[] binMins = baEncoder.getBinMins();
 				double[] binMaxs = baEncoder.getBinMaxs();
 				for(int i = 0; i < binMins.length; i++) {
diff --git a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderBin.java b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderBin.java
index fdc1ad6..cb6e0af 100644
--- a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderBin.java
+++ b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderBin.java
@@ -26,6 +26,7 @@ import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.PriorityQueue;
 import java.util.concurrent.Callable;
 
 import org.apache.commons.lang3.tuple.MutableTriple;
@@ -43,7 +44,6 @@ public class ColumnEncoderBin extends ColumnEncoder {
 	private static final long serialVersionUID = 1917445005206076078L;
 	protected int _numBin = -1;
 	private BinMethod _binMethod = BinMethod.EQUI_WIDTH;
-	private double[] _sortedInput = null;
 
 	// frame transform-apply attributes
 	// a) column bin boundaries
@@ -86,6 +86,17 @@ public class ColumnEncoderBin extends ColumnEncoder {
 		return _binMaxs;
 	}
 
+	public BinMethod getBinMethod() {
+		return _binMethod;
+	}
+
+	public void setBinMethod(String method) {
+		if (method.equalsIgnoreCase(BinMethod.EQUI_WIDTH.toString()))
+			_binMethod = BinMethod.EQUI_WIDTH;
+		if (method.equalsIgnoreCase(BinMethod.EQUI_HEIGHT.toString()))
+			_binMethod = BinMethod.EQUI_HEIGHT;
+	}
+
 	@Override
 	public void build(CacheBlock in) {
 		long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
@@ -96,8 +107,8 @@ public class ColumnEncoderBin extends ColumnEncoder {
 			computeBins(pairMinMax[0], pairMinMax[1]);
 		}
 		else if(_binMethod == BinMethod.EQUI_HEIGHT) {
-			prepareDataForEqualHeightBins(in, _colID, 0, -1);
-			computeEqualHeightBins();
+			double[] sortedCol = prepareDataForEqualHeightBins(in, _colID, 0, -1);
+			computeEqualHeightBins(sortedCol);
 		}
 
 		if(DMLScript.STATISTICS)
@@ -177,18 +188,19 @@ public class ColumnEncoderBin extends ColumnEncoder {
 		return new double[] {min, max};
 	}
 
-	private void prepareDataForEqualHeightBins(CacheBlock in, int colID, int startRow, int blockSize) {
-		int numRows = getEndIndex(in.getNumRows(), startRow, blockSize) - startRow;
-		_sortedInput = new double[numRows];
-		for(int i = startRow; i < numRows; i++) {
+	private static double[] prepareDataForEqualHeightBins(CacheBlock in, int colID, int startRow, int blockSize) {
+		int endRow = getEndIndex(in.getNumRows(), startRow, blockSize);
+		double[] vals = new double[endRow-startRow];
+		for(int i = startRow; i < endRow; i++) {
 			double inVal = in.getDouble(i, colID - 1);
 			//FIXME current NaN handling introduces 0s and thus
 			// impacts the computation of bin boundaries
 			if(Double.isNaN(inVal))
 				continue;
-			_sortedInput[i] = inVal;
+			vals[i-startRow] = inVal;
 		}
-		Arrays.sort(_sortedInput);
+		Arrays.sort(vals);
+		return vals;
 	}
 
 	@Override
@@ -197,9 +209,9 @@ public class ColumnEncoderBin extends ColumnEncoder {
 	}
 
 	@Override
-	public Callable<Object> getPartialBuildTask(CacheBlock in, int startRow, int blockSize, 
+	public Callable<Object> getPartialBuildTask(CacheBlock in, int startRow, int blockSize,
 			HashMap<Integer, Object> ret) {
-		return new BinPartialBuildTask(in, _colID, startRow, blockSize, ret);
+		return new BinPartialBuildTask(in, _colID, startRow, blockSize, _binMethod, ret);
 	}
 
 	@Override
@@ -219,20 +231,20 @@ public class ColumnEncoderBin extends ColumnEncoder {
 		}
 	}
 
-	private void computeEqualHeightBins() {
+	private void computeEqualHeightBins(double[] sortedCol) {
 		if(_binMins == null || _binMaxs == null) {
 			_binMins = new double[_numBin];
 			_binMaxs = new double[_numBin];
 		}
-		int n = _sortedInput.length;
+		int n = sortedCol.length;
 		for(int i = 0; i < _numBin; i++) {
 			double pos = n * (i + 1d) / _numBin;
 			_binMaxs[i] = (pos % 1 == 0) ? // pos is integer
-				_sortedInput[(int) pos-1] :
-				_sortedInput[(int) Math.floor(pos)];
+				sortedCol[(int) pos-1] :
+				sortedCol[(int) Math.floor(pos)];
 		}
-		_binMaxs[_numBin-1] = _sortedInput[n-1];
-		_binMins[0] = _sortedInput[0];
+		_binMaxs[_numBin-1] = sortedCol[n-1];
+		_binMins[0] = sortedCol[0];
 		System.arraycopy(_binMaxs, 0, _binMins, 1, _numBin - 1);
 	}
 
@@ -324,6 +336,7 @@ public class ColumnEncoderBin extends ColumnEncoder {
 		super.writeExternal(out);
 
 		out.writeInt(_numBin);
+		out.writeUTF(_binMethod.toString());
 		out.writeBoolean(_binMaxs != null);
 		if(_binMaxs != null) {
 			for(int j = 0; j < _binMaxs.length; j++) {
@@ -337,6 +350,7 @@ public class ColumnEncoderBin extends ColumnEncoder {
 	public void readExternal(ObjectInput in) throws IOException {
 		super.readExternal(in);
 		_numBin = in.readInt();
+		setBinMethod(in.readUTF());
 		boolean minmax = in.readBoolean();
 		_binMaxs = minmax ? new double[_numBin] : null;
 		_binMins = minmax ? new double[_numBin] : null;
@@ -385,24 +399,34 @@ public class ColumnEncoderBin extends ColumnEncoder {
 		private final int _blockSize;
 		private final int _startRow;
 		private final int _colID;
-		private final HashMap<Integer, Object> _partialMinMax;
+		private final BinMethod _method;
+		private final HashMap<Integer, Object> _partialData;
 
 		// if a pool is passed the task may be split up into multiple smaller tasks.
 		protected BinPartialBuildTask(CacheBlock input, int colID, int startRow, 
-				int blocksize, HashMap<Integer, Object> partialMinMax) {
+				int blocksize, BinMethod method, HashMap<Integer, Object> partialData) {
 			_input = input;
 			_blockSize = blocksize;
 			_colID = colID;
 			_startRow = startRow;
-			_partialMinMax = partialMinMax;
+			_method = method;
+			_partialData = partialData;
 		}
 
 		@Override
 		public double[] call() throws Exception {
 			long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
-			double[] minMax = getMinMaxOfCol(_input, _colID, _startRow, _blockSize);
-			synchronized (_partialMinMax){
-				_partialMinMax.put(_startRow, minMax);
+			if (_method == BinMethod.EQUI_WIDTH) {
+				double[] minMax = getMinMaxOfCol(_input, _colID, _startRow, _blockSize);
+				synchronized(_partialData) {
+					_partialData.put(_startRow, minMax);
+				}
+			}
+			if (_method == BinMethod.EQUI_HEIGHT) {
+				double[] sortedVals = prepareDataForEqualHeightBins(_input, _colID, _startRow, _blockSize);
+				synchronized(_partialData) {
+					_partialData.put(_startRow, sortedVals);
+				}
 			}
 			if (DMLScript.STATISTICS)
 				TransformStatistics.incBinningBuildTime(System.nanoTime()-t0);
@@ -424,16 +448,56 @@ public class ColumnEncoderBin extends ColumnEncoder {
 			_encoder = encoderBin;
 		}
 
+		private double[] mergeKSortedArrays(double[][] arrs) {
+			//PriorityQueue is heap in Java
+			PriorityQueue<ArrayContainer> queue;
+			queue = new PriorityQueue<>();
+			int total=0;
+
+			//add arrays to heap
+			for(double[] arr : arrs) {
+				queue.add(new ArrayContainer(arr, 0));
+				total = total + arr.length;
+			}
+			int m=0;
+			double[] result = new double[total];
+
+			//while heap is not empty
+			while(!queue.isEmpty()){
+				ArrayContainer ac = queue.poll();
+				result[m++]=ac.arr[ac.index];
+				if(ac.index < ac.arr.length-1){
+					queue.add(new ArrayContainer(ac.arr, ac.index+1));
+				}
+			}
+			return result;
+		}
+
 		@Override
 		public Object call() throws Exception {
 			long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
-			double min = Double.POSITIVE_INFINITY;
-			double max = Double.NEGATIVE_INFINITY;
-			for(Object minMax : _partialMaps.values()) {
-				min = Math.min(min, ((double[]) minMax)[0]);
-				max = Math.max(max, ((double[]) minMax)[1]);
+			if (_encoder.getBinMethod() == BinMethod.EQUI_WIDTH) {
+				double min = Double.POSITIVE_INFINITY;
+				double max = Double.NEGATIVE_INFINITY;
+				for(Object minMax : _partialMaps.values()) {
+					min = Math.min(min, ((double[]) minMax)[0]);
+					max = Math.max(max, ((double[]) minMax)[1]);
+				}
+				_encoder.computeBins(min, max);
+			}
+
+			if (_encoder.getBinMethod() == BinMethod.EQUI_HEIGHT) {
+				double[][] allParts = new double[_partialMaps.size()][];
+				int i = 0;
+				for (Object arr: _partialMaps.values())
+					allParts[i++] = (double[]) arr;
+
+				// Heap-based merging of sorted partitions.
+				// TODO: Derive bin boundaries from partial aggregates, avoiding
+				// materializing the sorted arrays (e.g. federated quantile)
+				double[] sortedRes = mergeKSortedArrays(allParts);
+				_encoder.computeEqualHeightBins(sortedRes);
 			}
-			_encoder.computeBins(min, max);
 
 			if(DMLScript.STATISTICS)
 				TransformStatistics.incBinningBuildTime(System.nanoTime()-t0);
@@ -446,6 +510,21 @@ public class ColumnEncoderBin extends ColumnEncoder {
 		}
 	}
 
+	private static class ArrayContainer implements Comparable<ArrayContainer> {
+		double[] arr;
+		int index;
+
+		public ArrayContainer(double[] arr, int index) {
+			this.arr = arr;
+			this.index = index;
+		}
+
+		@Override
+		public int compareTo(ArrayContainer o) {
+			return this.arr[this.index] < o.arr[o.index] ? -1 : 1;
+		}
+	}
+
 	private static class ColumnBinBuildTask implements Callable<Object> {
 		private final ColumnEncoderBin _encoder;
 		private final CacheBlock _input;
diff --git a/src/test/java/org/apache/sysds/test/functions/transform/TransformFrameBuildMultithreadedTest.java b/src/test/java/org/apache/sysds/test/functions/transform/TransformFrameBuildMultithreadedTest.java
index 0c235d6..27d7c78 100644
--- a/src/test/java/org/apache/sysds/test/functions/transform/TransformFrameBuildMultithreadedTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/transform/TransformFrameBuildMultithreadedTest.java
@@ -52,7 +52,8 @@ public class TransformFrameBuildMultithreadedTest extends AutomatedTestBase {
 	//private final static String SPEC1b = "homes3/homes.tfspec_recode2.json";
 	private final static String SPEC2 = "homes3/homes.tfspec_dummy.json";
 	//private final static String SPEC2b = "homes3/homes.tfspec_dummy2.json";
-	private final static String SPEC3 = "homes3/homes.tfspec_bin.json"; // recode
+	private final static String SPEC3a = "homes3/homes.tfspec_bin.json"; // recode
+	private final static String SPEC3b = "homes3/homes.tfspec_bin_height.json"; // recode
 	//private final static String SPEC3b = "homes3/homes.tfspec_bin2.json"; // recode
 	private final static String SPEC6 = "homes3/homes.tfspec_recode_dummy.json";
 	//private final static String SPEC6b = "homes3/homes.tfspec_recode_dummy2.json";
@@ -65,7 +66,7 @@ public class TransformFrameBuildMultithreadedTest extends AutomatedTestBase {
 	private final static String SPEC10 = "homes3/homes.tfspec_recode_bin.json";
 
 	public enum TransformType {
-		RECODE, DUMMY, RECODE_DUMMY, BIN, BIN_DUMMY, HASH, HASH_RECODE, RECODE_BIN,
+		RECODE, DUMMY, RECODE_DUMMY, BIN_WIDTH, BIN_HEIGHT, BIN_DUMMY, HASH, HASH_RECODE, RECODE_BIN,
 	}
 
 	@Override
@@ -101,12 +102,21 @@ public class TransformFrameBuildMultithreadedTest extends AutomatedTestBase {
 
 	@Test
 	public void testHomesBuildBinSingleNodeCSV() {
-		runTransformTest(Types.ExecMode.SINGLE_NODE, "csv", TransformType.BIN, 0);
+		runTransformTest(Types.ExecMode.SINGLE_NODE, "csv", TransformType.BIN_WIDTH, 0);
 	}
 
 	@Test
 	public void testHomesBuild50BinSingleNodeCSV() {
-		runTransformTest(Types.ExecMode.SINGLE_NODE, "csv", TransformType.BIN, 50);
+		runTransformTest(Types.ExecMode.SINGLE_NODE, "csv", TransformType.BIN_WIDTH, 50);
+	}
+	@Test
+	public void testHomesBuildBinEQHTCSV() {
+		runTransformTest(Types.ExecMode.SINGLE_NODE, "csv", TransformType.BIN_HEIGHT, 0);
+	}
+
+	@Test
+	public void testHomesBuild50BinEQHTCSV() {
+		runTransformTest(Types.ExecMode.SINGLE_NODE, "csv", TransformType.BIN_HEIGHT, 50);
 	}
 
 	@Test
@@ -132,8 +142,12 @@ public class TransformFrameBuildMultithreadedTest extends AutomatedTestBase {
 				SPEC = SPEC2;
 				DATASET = DATASET1;
 				break;
-			case BIN:
-				SPEC = SPEC3;
+			case BIN_WIDTH:
+				SPEC = SPEC3a;
+				DATASET = DATASET1;
+				break;
+			case BIN_HEIGHT:
+				SPEC = SPEC3b;
 				DATASET = DATASET1;
 				break;
 			case RECODE_DUMMY:
@@ -191,7 +205,7 @@ public class TransformFrameBuildMultithreadedTest extends AutomatedTestBase {
 					assertEquals(encodersS.get(i).getRcdMap().keySet(), encodersM.get(i).getRcdMap().keySet());
 				}
 			}
-			else if(type == TransformType.BIN) {
+			else if(type == TransformType.BIN_WIDTH || type == TransformType.BIN_HEIGHT) {
 				List<ColumnEncoderBin> encodersS = encoderS.getColumnEncoders(ColumnEncoderBin.class);
 				List<ColumnEncoderBin> encodersM = encoderM.getColumnEncoders(ColumnEncoderBin.class);
 				assertEquals(encodersS.size(), encodersM.size());