You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by ar...@apache.org on 2022/02/03 19:53:52 UTC
[systemds] branch main updated: [SYSTEMDS-3289] Multithreaded equi-height binning
This is an automated email from the ASF dual-hosted git repository.
arnabp20 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new fb5126a [SYSTEMDS-3289] Multithreaded equi-height binning
fb5126a is described below
commit fb5126a21ac249f04309f50c8ecab218c7104781
Author: arnabp <ar...@tugraz.at>
AuthorDate: Thu Feb 3 20:44:08 2022 +0100
[SYSTEMDS-3289] Multithreaded equi-height binning
This patch adds multithreaded support to equi-height binning
in transformencode/apply. We use partition-sorting and a
heap-based merging of sorted blocks.
Closes #1495.
---
...ltiReturnParameterizedBuiltinSPInstruction.java | 24 ++--
.../runtime/transform/encode/ColumnEncoderBin.java | 137 ++++++++++++++++-----
.../TransformFrameBuildMultithreadedTest.java | 28 +++--
3 files changed, 144 insertions(+), 45 deletions(-)
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
index 9c8c08b..bca0825 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
@@ -315,16 +315,22 @@ public class MultiReturnParameterizedBuiltinSPInstruction extends ComputationSPI
}
// handle bin boundaries
else if(_encoder.containsEncoderForID(colID, ColumnEncoderBin.class)) {
- double min = Double.MAX_VALUE;
- double max = -Double.MAX_VALUE;
- while(iter.hasNext()) {
- double value = Double.parseDouble(iter.next().toString());
- min = Math.min(min, value);
- max = Math.max(max, value);
- }
ColumnEncoderBin baEncoder = _encoder.getColumnEncoder(colID, ColumnEncoderBin.class);
- assert baEncoder != null;
- baEncoder.computeBins(min, max);
+ if (baEncoder.getBinMethod() == ColumnEncoderBin.BinMethod.EQUI_WIDTH) {
+ double min = Double.MAX_VALUE;
+ double max = -Double.MAX_VALUE;
+ while(iter.hasNext()) {
+ double value = Double.parseDouble(iter.next().toString());
+ min = Math.min(min, value);
+ max = Math.max(max, value);
+ }
+ assert baEncoder != null;
+ baEncoder.computeBins(min, max);
+ }
+ else //TODO: support equi-height
+ throw new DMLRuntimeException("Binning method "+baEncoder.getBinMethod().toString()
+ +" is not support for Spark");
+
double[] binMins = baEncoder.getBinMins();
double[] binMaxs = baEncoder.getBinMaxs();
for(int i = 0; i < binMins.length; i++) {
diff --git a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderBin.java b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderBin.java
index fdc1ad6..cb6e0af 100644
--- a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderBin.java
+++ b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderBin.java
@@ -26,6 +26,7 @@ import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.PriorityQueue;
import java.util.concurrent.Callable;
import org.apache.commons.lang3.tuple.MutableTriple;
@@ -43,7 +44,6 @@ public class ColumnEncoderBin extends ColumnEncoder {
private static final long serialVersionUID = 1917445005206076078L;
protected int _numBin = -1;
private BinMethod _binMethod = BinMethod.EQUI_WIDTH;
- private double[] _sortedInput = null;
// frame transform-apply attributes
// a) column bin boundaries
@@ -86,6 +86,17 @@ public class ColumnEncoderBin extends ColumnEncoder {
return _binMaxs;
}
+ public BinMethod getBinMethod() {
+ return _binMethod;
+ }
+
+ public void setBinMethod(String method) {
+ if (method.equalsIgnoreCase(BinMethod.EQUI_WIDTH.toString()))
+ _binMethod = BinMethod.EQUI_WIDTH;
+ if (method.equalsIgnoreCase(BinMethod.EQUI_HEIGHT.toString()))
+ _binMethod = BinMethod.EQUI_HEIGHT;
+ }
+
@Override
public void build(CacheBlock in) {
long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
@@ -96,8 +107,8 @@ public class ColumnEncoderBin extends ColumnEncoder {
computeBins(pairMinMax[0], pairMinMax[1]);
}
else if(_binMethod == BinMethod.EQUI_HEIGHT) {
- prepareDataForEqualHeightBins(in, _colID, 0, -1);
- computeEqualHeightBins();
+ double[] sortedCol = prepareDataForEqualHeightBins(in, _colID, 0, -1);
+ computeEqualHeightBins(sortedCol);
}
if(DMLScript.STATISTICS)
@@ -177,18 +188,19 @@ public class ColumnEncoderBin extends ColumnEncoder {
return new double[] {min, max};
}
- private void prepareDataForEqualHeightBins(CacheBlock in, int colID, int startRow, int blockSize) {
- int numRows = getEndIndex(in.getNumRows(), startRow, blockSize) - startRow;
- _sortedInput = new double[numRows];
- for(int i = startRow; i < numRows; i++) {
+ private static double[] prepareDataForEqualHeightBins(CacheBlock in, int colID, int startRow, int blockSize) {
+ int endRow = getEndIndex(in.getNumRows(), startRow, blockSize);
+ double[] vals = new double[endRow-startRow];
+ for(int i = startRow; i < endRow; i++) {
double inVal = in.getDouble(i, colID - 1);
//FIXME current NaN handling introduces 0s and thus
// impacts the computation of bin boundaries
if(Double.isNaN(inVal))
continue;
- _sortedInput[i] = inVal;
+ vals[i-startRow] = inVal;
}
- Arrays.sort(_sortedInput);
+ Arrays.sort(vals);
+ return vals;
}
@Override
@@ -197,9 +209,9 @@ public class ColumnEncoderBin extends ColumnEncoder {
}
@Override
- public Callable<Object> getPartialBuildTask(CacheBlock in, int startRow, int blockSize,
+ public Callable<Object> getPartialBuildTask(CacheBlock in, int startRow, int blockSize,
HashMap<Integer, Object> ret) {
- return new BinPartialBuildTask(in, _colID, startRow, blockSize, ret);
+ return new BinPartialBuildTask(in, _colID, startRow, blockSize, _binMethod, ret);
}
@Override
@@ -219,20 +231,20 @@ public class ColumnEncoderBin extends ColumnEncoder {
}
}
- private void computeEqualHeightBins() {
+ private void computeEqualHeightBins(double[] sortedCol) {
if(_binMins == null || _binMaxs == null) {
_binMins = new double[_numBin];
_binMaxs = new double[_numBin];
}
- int n = _sortedInput.length;
+ int n = sortedCol.length;
for(int i = 0; i < _numBin; i++) {
double pos = n * (i + 1d) / _numBin;
_binMaxs[i] = (pos % 1 == 0) ? // pos is integer
- _sortedInput[(int) pos-1] :
- _sortedInput[(int) Math.floor(pos)];
+ sortedCol[(int) pos-1] :
+ sortedCol[(int) Math.floor(pos)];
}
- _binMaxs[_numBin-1] = _sortedInput[n-1];
- _binMins[0] = _sortedInput[0];
+ _binMaxs[_numBin-1] = sortedCol[n-1];
+ _binMins[0] = sortedCol[0];
System.arraycopy(_binMaxs, 0, _binMins, 1, _numBin - 1);
}
@@ -324,6 +336,7 @@ public class ColumnEncoderBin extends ColumnEncoder {
super.writeExternal(out);
out.writeInt(_numBin);
+ out.writeUTF(_binMethod.toString());
out.writeBoolean(_binMaxs != null);
if(_binMaxs != null) {
for(int j = 0; j < _binMaxs.length; j++) {
@@ -337,6 +350,7 @@ public class ColumnEncoderBin extends ColumnEncoder {
public void readExternal(ObjectInput in) throws IOException {
super.readExternal(in);
_numBin = in.readInt();
+ setBinMethod(in.readUTF());
boolean minmax = in.readBoolean();
_binMaxs = minmax ? new double[_numBin] : null;
_binMins = minmax ? new double[_numBin] : null;
@@ -385,24 +399,34 @@ public class ColumnEncoderBin extends ColumnEncoder {
private final int _blockSize;
private final int _startRow;
private final int _colID;
- private final HashMap<Integer, Object> _partialMinMax;
+ private final BinMethod _method;
+ private final HashMap<Integer, Object> _partialData;
// if a pool is passed the task may be split up into multiple smaller tasks.
protected BinPartialBuildTask(CacheBlock input, int colID, int startRow,
- int blocksize, HashMap<Integer, Object> partialMinMax) {
+ int blocksize, BinMethod method, HashMap<Integer, Object> partialData) {
_input = input;
_blockSize = blocksize;
_colID = colID;
_startRow = startRow;
- _partialMinMax = partialMinMax;
+ _method = method;
+ _partialData = partialData;
}
@Override
public double[] call() throws Exception {
long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
- double[] minMax = getMinMaxOfCol(_input, _colID, _startRow, _blockSize);
- synchronized (_partialMinMax){
- _partialMinMax.put(_startRow, minMax);
+ if (_method == BinMethod.EQUI_WIDTH) {
+ double[] minMax = getMinMaxOfCol(_input, _colID, _startRow, _blockSize);
+ synchronized(_partialData) {
+ _partialData.put(_startRow, minMax);
+ }
+ }
+ if (_method == BinMethod.EQUI_HEIGHT) {
+ double[] sortedVals = prepareDataForEqualHeightBins(_input, _colID, _startRow, _blockSize);
+ synchronized(_partialData) {
+ _partialData.put(_startRow, sortedVals);
+ }
}
if (DMLScript.STATISTICS)
TransformStatistics.incBinningBuildTime(System.nanoTime()-t0);
@@ -424,16 +448,56 @@ public class ColumnEncoderBin extends ColumnEncoder {
_encoder = encoderBin;
}
+ private double[] mergeKSortedArrays(double[][] arrs) {
+ //PriorityQueue is heap in Java
+ PriorityQueue<ArrayContainer> queue;
+ queue = new PriorityQueue<>();
+ int total=0;
+
+ //add arrays to heap
+ for(double[] arr : arrs) {
+ queue.add(new ArrayContainer(arr, 0));
+ total = total + arr.length;
+ }
+ int m=0;
+ double[] result = new double[total];
+
+ //while heap is not empty
+ while(!queue.isEmpty()){
+ ArrayContainer ac = queue.poll();
+ result[m++]=ac.arr[ac.index];
+ if(ac.index < ac.arr.length-1){
+ queue.add(new ArrayContainer(ac.arr, ac.index+1));
+ }
+ }
+ return result;
+ }
+
@Override
public Object call() throws Exception {
long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
- double min = Double.POSITIVE_INFINITY;
- double max = Double.NEGATIVE_INFINITY;
- for(Object minMax : _partialMaps.values()) {
- min = Math.min(min, ((double[]) minMax)[0]);
- max = Math.max(max, ((double[]) minMax)[1]);
+ if (_encoder.getBinMethod() == BinMethod.EQUI_WIDTH) {
+ double min = Double.POSITIVE_INFINITY;
+ double max = Double.NEGATIVE_INFINITY;
+ for(Object minMax : _partialMaps.values()) {
+ min = Math.min(min, ((double[]) minMax)[0]);
+ max = Math.max(max, ((double[]) minMax)[1]);
+ }
+ _encoder.computeBins(min, max);
+ }
+
+ if (_encoder.getBinMethod() == BinMethod.EQUI_HEIGHT) {
+ double[][] allParts = new double[_partialMaps.size()][];
+ int i = 0;
+ for (Object arr: _partialMaps.values())
+ allParts[i++] = (double[]) arr;
+
+ // Heap-based merging of sorted partitions.
+ // TODO: Derive bin boundaries from partial aggregates, avoiding
+ // materializing the sorted arrays (e.g. federated quantile)
+ double[] sortedRes = mergeKSortedArrays(allParts);
+ _encoder.computeEqualHeightBins(sortedRes);
}
- _encoder.computeBins(min, max);
if(DMLScript.STATISTICS)
TransformStatistics.incBinningBuildTime(System.nanoTime()-t0);
@@ -446,6 +510,21 @@ public class ColumnEncoderBin extends ColumnEncoder {
}
}
+ private static class ArrayContainer implements Comparable<ArrayContainer> {
+ double[] arr;
+ int index;
+
+ public ArrayContainer(double[] arr, int index) {
+ this.arr = arr;
+ this.index = index;
+ }
+
+ @Override
+ public int compareTo(ArrayContainer o) {
+ return this.arr[this.index] < o.arr[o.index] ? -1 : 1;
+ }
+ }
+
private static class ColumnBinBuildTask implements Callable<Object> {
private final ColumnEncoderBin _encoder;
private final CacheBlock _input;
diff --git a/src/test/java/org/apache/sysds/test/functions/transform/TransformFrameBuildMultithreadedTest.java b/src/test/java/org/apache/sysds/test/functions/transform/TransformFrameBuildMultithreadedTest.java
index 0c235d6..27d7c78 100644
--- a/src/test/java/org/apache/sysds/test/functions/transform/TransformFrameBuildMultithreadedTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/transform/TransformFrameBuildMultithreadedTest.java
@@ -52,7 +52,8 @@ public class TransformFrameBuildMultithreadedTest extends AutomatedTestBase {
//private final static String SPEC1b = "homes3/homes.tfspec_recode2.json";
private final static String SPEC2 = "homes3/homes.tfspec_dummy.json";
//private final static String SPEC2b = "homes3/homes.tfspec_dummy2.json";
- private final static String SPEC3 = "homes3/homes.tfspec_bin.json"; // recode
+ private final static String SPEC3a = "homes3/homes.tfspec_bin.json"; // recode
+ private final static String SPEC3b = "homes3/homes.tfspec_bin_height.json"; // recode
//private final static String SPEC3b = "homes3/homes.tfspec_bin2.json"; // recode
private final static String SPEC6 = "homes3/homes.tfspec_recode_dummy.json";
//private final static String SPEC6b = "homes3/homes.tfspec_recode_dummy2.json";
@@ -65,7 +66,7 @@ public class TransformFrameBuildMultithreadedTest extends AutomatedTestBase {
private final static String SPEC10 = "homes3/homes.tfspec_recode_bin.json";
public enum TransformType {
- RECODE, DUMMY, RECODE_DUMMY, BIN, BIN_DUMMY, HASH, HASH_RECODE, RECODE_BIN,
+ RECODE, DUMMY, RECODE_DUMMY, BIN_WIDTH, BIN_HEIGHT, BIN_DUMMY, HASH, HASH_RECODE, RECODE_BIN,
}
@Override
@@ -101,12 +102,21 @@ public class TransformFrameBuildMultithreadedTest extends AutomatedTestBase {
@Test
public void testHomesBuildBinSingleNodeCSV() {
- runTransformTest(Types.ExecMode.SINGLE_NODE, "csv", TransformType.BIN, 0);
+ runTransformTest(Types.ExecMode.SINGLE_NODE, "csv", TransformType.BIN_WIDTH, 0);
}
@Test
public void testHomesBuild50BinSingleNodeCSV() {
- runTransformTest(Types.ExecMode.SINGLE_NODE, "csv", TransformType.BIN, 50);
+ runTransformTest(Types.ExecMode.SINGLE_NODE, "csv", TransformType.BIN_WIDTH, 50);
+ }
+ @Test
+ public void testHomesBuildBinEQHTCSV() {
+ runTransformTest(Types.ExecMode.SINGLE_NODE, "csv", TransformType.BIN_HEIGHT, 0);
+ }
+
+ @Test
+ public void testHomesBuild50BinEQHTCSV() {
+ runTransformTest(Types.ExecMode.SINGLE_NODE, "csv", TransformType.BIN_HEIGHT, 50);
}
@Test
@@ -132,8 +142,12 @@ public class TransformFrameBuildMultithreadedTest extends AutomatedTestBase {
SPEC = SPEC2;
DATASET = DATASET1;
break;
- case BIN:
- SPEC = SPEC3;
+ case BIN_WIDTH:
+ SPEC = SPEC3a;
+ DATASET = DATASET1;
+ break;
+ case BIN_HEIGHT:
+ SPEC = SPEC3b;
DATASET = DATASET1;
break;
case RECODE_DUMMY:
@@ -191,7 +205,7 @@ public class TransformFrameBuildMultithreadedTest extends AutomatedTestBase {
assertEquals(encodersS.get(i).getRcdMap().keySet(), encodersM.get(i).getRcdMap().keySet());
}
}
- else if(type == TransformType.BIN) {
+ else if(type == TransformType.BIN_WIDTH || type == TransformType.BIN_HEIGHT) {
List<ColumnEncoderBin> encodersS = encoderS.getColumnEncoders(ColumnEncoderBin.class);
List<ColumnEncoderBin> encodersM = encoderM.getColumnEncoders(ColumnEncoderBin.class);
assertEquals(encodersS.size(), encodersM.size());