You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by ba...@apache.org on 2021/12/28 14:56:04 UTC

[systemds] 01/02: [SYSTEMDS-3262] CLA Offset memorizer

This is an automated email from the ASF dual-hosted git repository.

baunsgaard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git

commit 5873e09468d9e1252b25c3b8849e19376d21143b
Author: baunsgaard <ba...@tugraz.at>
AuthorDate: Sat Dec 25 16:38:59 2021 +0100

    [SYSTEMDS-3262] CLA Offset memorizer
    
    This commit reintroduce a memorizer for the offsets if a offset is
    requested and it is not to be fund in the thread local cache, it will
    populate a global static memorizer with a iterator for this index,
    this allows queries to first try the thread local version of the
    cache and use this if it is appropriate and then fall back to the
    memorizer.
    
    If operations adhere to use the thread local cache once jobs are allocated
    this implementation is memory friendly and effecient. But if threads are
    allocated with small jobs that in turn populate this cache it becomes bad.
    In practice decompression now is designed to only hit the memorizer once
    because each thread have a portion of rows to process, and there is no
    extra jobs allocated making the memorizer Iterator * Threads size.
---
 .../runtime/compress/colgroup/ColGroupDDC.java     | 29 +++++++++++----
 .../runtime/compress/colgroup/ColGroupSDC.java     | 12 ++++---
 .../compress/colgroup/offset/AIterator.java        | 11 ------
 .../runtime/compress/colgroup/offset/AOffset.java  | 20 +++++++++++
 .../compress/colgroup/offset/OffsetByte.java       | 41 ++++++++++++++++------
 .../runtime/compress/lib/CLALibDecompress.java     | 25 +++++++------
 .../component/compress/offset/OffsetTests.java     |  5 +--
 7 files changed, 97 insertions(+), 46 deletions(-)

diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC.java
index c7e2a34..8b9888f 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC.java
@@ -72,16 +72,31 @@ public class ColGroupDDC extends APreAgg {
 	@Override
 	protected void decompressToDenseBlockDenseDictionary(DenseBlock db, int rl, int ru, int offR, int offC,
 		double[] values) {
-		final int nCol = _colIndexes.length;
-		for(int i = rl, offT = rl + offR; i < ru; i++, offT++) {
-			final double[] c = db.values(offT);
-			final int off = db.pos(offT) + offC;
-			final int rowIndex = _data.getIndex(i) * nCol;
-			for(int j = 0; j < nCol; j++)
-				c[off + _colIndexes[j]] += values[rowIndex + j];
+		if(db.isContiguous() && _colIndexes.length == 1)
+			decompressToDenseBlockDenseDictSingleColContiguous(db, rl, ru, offR, offC, values);
+		else {
+			// generic
+			final int nCol = _colIndexes.length;
+			for(int i = rl, offT = rl + offR; i < ru; i++, offT++) {
+				final double[] c = db.values(offT);
+				final int off = db.pos(offT) + offC;
+				final int rowIndex = _data.getIndex(i) * nCol;
+				for(int j = 0; j < nCol; j++)
+					c[off + _colIndexes[j]] += values[rowIndex + j];
+			}
 		}
 	}
 
+	private void decompressToDenseBlockDenseDictSingleColContiguous(DenseBlock db, int rl, int ru, int offR, int offC,
+		double[] values) {
+		final double[] c = db.values(0);
+		final int nCols = db.getDim(1);
+		final int colOff = _colIndexes[0] + offC;
+		for(int i = rl, offT = (rl + offR) * nCols + colOff; i < ru; i++, offT += nCols)
+			c[offT] += values[_data.getIndex(i)];
+
+	}
+
 	@Override
 	protected void decompressToSparseBlockSparseDictionary(SparseBlock ret, int rl, int ru, int offR, int offC,
 		SparseBlock sb) {
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSDC.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSDC.java
index 759252a..e825756 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSDC.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSDC.java
@@ -123,8 +123,10 @@ public class ColGroupSDC extends AMorphingMMColGroup {
 		}
 		else if(it != null) {
 			while(r < ru) {
-				if(it.value() == r)
-					c[r] += preAgg[data.getIndex(it.getDataIndexAndIncrement())];
+				if(it.value() == r){
+					c[r] += preAgg[data.getIndex(it.getDataIndex())];
+					it.next();
+				}
 				else
 					c[r] += def;
 				r++;
@@ -168,8 +170,10 @@ public class ColGroupSDC extends AMorphingMMColGroup {
 		}
 		else if(it != null) {
 			while(r < ru) {
-				if(it.value() == r)
-					c[r] = builtin.execute(c[r], vals[data.getIndex(it.getDataIndexAndIncrement())]);
+				if(it.value() == r){
+					c[r] = builtin.execute(c[r], vals[data.getIndex(it.getDataIndex())]);
+					it.next();
+				}
 				else
 					c[r] = builtin.execute(c[r], def);
 				r++;
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/offset/AIterator.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/offset/AIterator.java
index 1c7e81e..bb4d13e 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/offset/AIterator.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/offset/AIterator.java
@@ -92,17 +92,6 @@ public abstract class AIterator {
 	}
 
 	/**
-	 * Get the current data index and increment the pointers using the next operator.
-	 * 
-	 * @return The current data index.
-	 */
-	public int getDataIndexAndIncrement() {
-		int x = dataIndex;
-		next();
-		return x;
-	}
-
-	/**
 	 * Skip values until index is achieved.
 	 * 
 	 * @param idx The index to skip to.
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/offset/AOffset.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/offset/AOffset.java
index 112fbb1..328dcb5 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/offset/AOffset.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/offset/AOffset.java
@@ -22,6 +22,8 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.BitSet;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.commons.logging.Log;
@@ -44,6 +46,7 @@ public abstract class AOffset implements Serializable {
 
 	protected static final Log LOG = LogFactory.getLog(AOffset.class.getName());
 
+	/** Thread local cache for a single recently used Iterator, this is used for cache blocking */
 	private ThreadLocal<OffsetCache> cacheRow = new ThreadLocal<OffsetCache>() {
 		@Override
 		protected OffsetCache initialValue() {
@@ -51,6 +54,9 @@ public abstract class AOffset implements Serializable {
 		}
 	};
 
+	/** Memorizer for the row indexes mostly used for when we parallelize across rows */
+	private Map<Integer, AIterator> memorizer = null;
+
 	/**
 	 * Get an iterator of the offsets.
 	 * 
@@ -73,19 +79,25 @@ public abstract class AOffset implements Serializable {
 		// try the cache first.
 		OffsetCache c = cacheRow.get();
 		if(c == null) {
+			if(memorizer != null && memorizer.containsKey(row))
+				return memorizer.get(row).clone();
 			AIterator it = getIterator();
 			it.skipTo(row);
 			cacheIterator(it.clone(), row);
+			memorizeIterator(it.clone(), row);
 			return it;
 		}
 		else if(c.row == row)
 			return c.it.clone();
 		else {
+			if(memorizer != null && memorizer.containsKey(row))
+				return memorizer.get(row).clone();
 			// Use the cached iterator if it is closer to the queried row.
 			AIterator it = c.row < row ? c.it.clone() : getIterator();
 			it.skipTo(row);
 			// cache this new iterator.
 			cacheIterator(it.clone(), row);
+			memorizeIterator(it.clone(), row);
 			return it;
 		}
 
@@ -103,6 +115,14 @@ public abstract class AOffset implements Serializable {
 		cacheRow.set(new OffsetCache(it, row));
 	}
 
+	private void memorizeIterator(AIterator it, int row) {
+		if(it == null)
+			return;
+		else if(memorizer == null)
+			memorizer = new HashMap<>();
+		memorizer.put(row, it);
+	}
+
 	/**
 	 * Write the offsets to disk.
 	 * 
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/offset/OffsetByte.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/offset/OffsetByte.java
index e09c20b..4654cdb 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/offset/OffsetByte.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/offset/OffsetByte.java
@@ -35,6 +35,7 @@ public class OffsetByte extends AOffset {
 	private final int offsetToFirst;
 	private final int offsetToLast;
 	private final boolean noOverHalf;
+	private final boolean noZero;
 
 	public OffsetByte(int[] indexes) {
 		this(indexes, 0, indexes.length);
@@ -72,13 +73,9 @@ public class OffsetByte extends AOffset {
 
 			ov = nv;
 		}
-		boolean noOverHalf = true;
-		for(byte b : offsets)
-			if(b < 0) {
-				noOverHalf = false;
-				break;
-			}
-		this.noOverHalf = noOverHalf;
+
+		this.noOverHalf = getNoOverHalf();
+		this.noZero = getNoZero();
 	}
 
 	protected OffsetByte(byte[] offsets, int offsetToFirst, int offsetToLast) {
@@ -86,18 +83,29 @@ public class OffsetByte extends AOffset {
 		this.offsetToFirst = offsetToFirst;
 		this.offsetToLast = offsetToLast;
 		this.noOverHalf = getNoOverHalf();
+		this.noZero = getNoZero();
 	}
 
 	private boolean getNoOverHalf() {
 		boolean noOverHalf = true;
 		for(byte b : offsets)
-			if(b < 0) {
+			if(b < 1) {
 				noOverHalf = false;
 				break;
 			}
 		return noOverHalf;
 	}
 
+	private boolean getNoZero() {
+		boolean noZero = true;
+		for(byte b : offsets)
+			if(b == 0) {
+				noZero = false;
+				break;
+			}
+		return noZero;
+	}
+
 	@Override
 	public IterateByteOffset getIterator() {
 		return new IterateByteOffset();
@@ -172,7 +180,6 @@ public class OffsetByte extends AOffset {
 	protected final void preAggregateDenseMapRowByte(double[] mV, int off, double[] preAV, int cu, int nVal, byte[] data,
 		AIterator it) {
 		IterateByteOffset itb = (IterateByteOffset) it;
-		final boolean noZero = offsets.length == data.length - 1;
 		if(cu < offsetToLast + 1) {
 			final boolean nvalHalf = nVal < 127;
 			if(noOverHalf && noZero && nvalHalf)
@@ -616,8 +623,20 @@ public class OffsetByte extends AOffset {
 
 		@Override
 		public int skipTo(int idx) {
-			while(offset < idx && index < offsets.length)
-				next();
+			if(noOverHalf) {
+				while(offset < idx && index < offsets.length) {
+					byte v = offsets[index];
+					offset += v;
+					index++;
+				}
+				dataIndex = index;
+			}
+			else if(idx < offsetToLast)
+				while(offset < idx)
+					next();
+			else
+				while(offset < idx && index < offsets.length)
+					next();
 			return offset;
 		}
 
diff --git a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibDecompress.java b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibDecompress.java
index b91b38d..df1c285 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibDecompress.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibDecompress.java
@@ -147,11 +147,8 @@ public class CLALibDecompress {
 				ret.allocateDenseBlock();
 		}
 
-		// final int block = (int) Math.ceil((double) (CompressionSettings.BITMAP_BLOCK_SZ) / nCols);
-		// final int blklen = Math.max(block, 64);
-		final int blklen = 32;
-
-		// final int blklen = block > 1000 ? block + 1000 - block % 1000 : Math.max(64, block);
+		// final int blklen = Math.max(nRows / (k * 2), 512);
+		final int blklen = Math.max(nRows / k , 512);
 
 		// check if we are using filtered groups, and if we are not force constV to null
 		if(groups == filteredGroups)
@@ -317,13 +314,19 @@ public class CLALibDecompress {
 
 		@Override
 		public Long call() {
-			for(AColGroup grp : _colGroups)
-				grp.decompressToDenseBlock(_ret.getDenseBlock(), _rl, _ru);
-
-			if(_constV != null)
-				addVector(_ret, _constV, _eps, _rl, _ru);
+			final int blk = 1024;
+			long nnz = 0;
+			for(int b = _rl; b < _ru; b+= blk){
+				int e = Math.min(b + blk , _ru);
+				for(AColGroup grp : _colGroups)
+					grp.decompressToDenseBlock(_ret.getDenseBlock(), b, e);
+
+				if(_constV != null)
+					addVector(_ret, _constV, _eps, b, e);
+				nnz += _overlapping ? 0 : _ret.recomputeNonZeros(b, e - 1);
+			}
 
-			return _overlapping ? 0 : _ret.recomputeNonZeros(_rl, _ru - 1);
+			return nnz;
 		}
 	}
 
diff --git a/src/test/java/org/apache/sysds/test/component/compress/offset/OffsetTests.java b/src/test/java/org/apache/sysds/test/component/compress/offset/OffsetTests.java
index 344459c..7a476aa 100644
--- a/src/test/java/org/apache/sysds/test/component/compress/offset/OffsetTests.java
+++ b/src/test/java/org/apache/sysds/test/component/compress/offset/OffsetTests.java
@@ -330,7 +330,7 @@ public class OffsetTests {
 	}
 
 	@Test
-	public void testAskForLastElementP1IsNull(){
+	public void testAskForLastElementP1IsNull() {
 		if(data.length == 2)
 			assertTrue(o.getIterator(data[1] + 1) == null);
 	}
@@ -383,7 +383,8 @@ public class OffsetTests {
 	}
 
 	public void testGetDataAfterNextCombN(AIterator it) {
-		int d = it.getDataIndexAndIncrement();
+		int d = it.getDataIndex();
+		it.next();
 		assertEquals(d + 1, it.getDataIndex());
 	}